Remove leftover TODO code around centralized healthmonitor
This commit is contained in:
parent
13447f439f
commit
fed5c543d4
|
@ -21,8 +21,6 @@ set(FDBCLIENT_SRCS
|
|||
FDBOptions.h
|
||||
FDBTypes.h
|
||||
FileBackupAgent.actor.cpp
|
||||
HealthMonitorClient.h
|
||||
HealthMonitorClient.actor.cpp
|
||||
HTTP.actor.cpp
|
||||
IClientApi.h
|
||||
JsonBuilder.cpp
|
||||
|
|
|
@ -36,7 +36,6 @@ struct ClusterInterface {
|
|||
RequestStream< ReplyPromise<Void> > ping;
|
||||
RequestStream< struct GetClientWorkersRequest > getClientWorkers;
|
||||
RequestStream< struct ForceRecoveryRequest > forceRecovery;
|
||||
RequestStream< struct HealthMonitoringRequest > healthMonitoring;
|
||||
|
||||
bool operator == (ClusterInterface const& r) const { return id() == r.id(); }
|
||||
bool operator != (ClusterInterface const& r) const { return id() != r.id(); }
|
||||
|
@ -49,8 +48,7 @@ struct ClusterInterface {
|
|||
databaseStatus.getFuture().isReady() ||
|
||||
ping.getFuture().isReady() ||
|
||||
getClientWorkers.getFuture().isReady() ||
|
||||
forceRecovery.getFuture().isReady() ||
|
||||
healthMonitoring.getFuture().isReady();
|
||||
forceRecovery.getFuture().isReady();
|
||||
}
|
||||
|
||||
void initEndpoints() {
|
||||
|
@ -60,13 +58,11 @@ struct ClusterInterface {
|
|||
ping.getEndpoint( TaskPriority::ClusterController );
|
||||
getClientWorkers.getEndpoint( TaskPriority::ClusterController );
|
||||
forceRecovery.getEndpoint( TaskPriority::ClusterController );
|
||||
healthMonitoring.getEndpoint( TaskPriority::FailureMonitor );
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
serializer(ar, openDatabase, failureMonitoring, databaseStatus, ping, getClientWorkers, forceRecovery,
|
||||
healthMonitoring);
|
||||
serializer(ar, openDatabase, failureMonitoring, databaseStatus, ping, getClientWorkers, forceRecovery);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -234,31 +230,6 @@ struct FailureMonitoringRequest {
|
|||
}
|
||||
};
|
||||
|
||||
struct HealthMonitoringReply {
|
||||
constexpr static FileIdentifier file_identifier = 6820326;
|
||||
Version healthInformationVersion;
|
||||
Arena arena;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, healthInformationVersion, arena);
|
||||
}
|
||||
};
|
||||
|
||||
struct HealthMonitoringRequest {
|
||||
constexpr static FileIdentifier file_identifier = 5867852;
|
||||
Version healthInformationVersion;
|
||||
int lastRequestElapsed;
|
||||
std::map<NetworkAddress, int> closedPeers;
|
||||
std::map<NetworkAddress, bool> peerStatus;
|
||||
ReplyPromise<struct HealthMonitoringReply> reply;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, lastRequestElapsed, healthInformationVersion, closedPeers, peerStatus, reply);
|
||||
}
|
||||
};
|
||||
|
||||
struct StatusReply {
|
||||
constexpr static FileIdentifier file_identifier = 9980504;
|
||||
StatusObject statusObj;
|
||||
|
|
|
@ -1,111 +0,0 @@
|
|||
/*
|
||||
* HealthMonitorClient.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/HealthMonitorClient.h"
|
||||
#include "fdbrpc/FailureMonitor.h"
|
||||
#include "fdbclient/ClusterInterface.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
#include <unordered_set>
|
||||
|
||||
struct HealthMonitorClientState : ReferenceCounted<HealthMonitorClientState> {
|
||||
HealthMonitorClientState() { }
|
||||
};
|
||||
|
||||
ACTOR Future<Void> healthMonitorClientLoop(ClusterInterface controller, Reference<HealthMonitorClientState> hmState) {
|
||||
state Version version = 0;
|
||||
state Future<HealthMonitoringReply> request = Never();
|
||||
state Future<Void> nextRequest = delay(0, TaskPriority::FailureMonitor);
|
||||
state Future<Void> requestTimeout = Never();
|
||||
state double before = now();
|
||||
state double waitfor = 0;
|
||||
|
||||
state int CLIENT_REQUEST_FAILED_TIMEOUT_SECS = 2; /* seconds */
|
||||
try {
|
||||
loop {
|
||||
choose {
|
||||
when(HealthMonitoringReply reply = wait(request)) {
|
||||
g_network->setCurrentTask(TaskPriority::DefaultDelay);
|
||||
request = Never();
|
||||
requestTimeout = Never();
|
||||
version = reply.healthInformationVersion;
|
||||
|
||||
before = now();
|
||||
waitfor = FLOW_KNOBS->HEALTH_MONITOR_CLIENT_REQUEST_INTERVAL_SECS;
|
||||
nextRequest = delayJittered(waitfor, TaskPriority::FailureMonitor);
|
||||
}
|
||||
when(wait(requestTimeout)) {
|
||||
g_network->setCurrentTask(TaskPriority::DefaultDelay);
|
||||
requestTimeout = Never();
|
||||
TraceEvent(SevWarn, "HealthMonitoringServerDown").detail("OldServerID", controller.id());
|
||||
}
|
||||
when(wait(nextRequest)) {
|
||||
g_network->setCurrentTask(TaskPriority::DefaultDelay);
|
||||
nextRequest = Never();
|
||||
|
||||
double elapsed = now() - before;
|
||||
double slowThreshold = .200 + waitfor + FLOW_KNOBS->MAX_BUGGIFIED_DELAY;
|
||||
double warnAlwaysThreshold = CLIENT_KNOBS->FAILURE_MIN_DELAY / 2;
|
||||
|
||||
if (elapsed > slowThreshold && deterministicRandom()->random01() < elapsed / warnAlwaysThreshold) {
|
||||
TraceEvent(elapsed > warnAlwaysThreshold ? SevWarnAlways : SevWarn, "HealthMonitorClientSlow")
|
||||
.detail("Elapsed", elapsed)
|
||||
.detail("Expected", waitfor);
|
||||
}
|
||||
|
||||
std::map<NetworkAddress, int> closedPeers;
|
||||
for (const auto& entry : FlowTransport::transport().healthMonitor()->getPeerClosedHistory()) {
|
||||
closedPeers[entry.second] += 1;
|
||||
}
|
||||
|
||||
HealthMonitoringRequest req;
|
||||
req.healthInformationVersion = version;
|
||||
req.closedPeers = closedPeers;
|
||||
req.peerStatus = FlowTransport::transport().healthMonitor()->getPeerStatus();
|
||||
request = controller.healthMonitoring.getReply(req, TaskPriority::FailureMonitor);
|
||||
if (!controller.healthMonitoring.getEndpoint().isLocal())
|
||||
requestTimeout = delay(CLIENT_REQUEST_FAILED_TIMEOUT_SECS, TaskPriority::FailureMonitor);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_broken_promise) // broken promise from clustercontroller means it has died (and
|
||||
// hopefully will be replaced)
|
||||
return Void();
|
||||
TraceEvent(SevError, "HealthMonitorClientError").error(e);
|
||||
throw; // goes nowhere
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> healthMonitorClient(Reference<AsyncVar<Optional<struct ClusterInterface>>> ci) {
|
||||
TraceEvent("HealthMonitorStart").detail("IsClient", FlowTransport::transport().isClient());
|
||||
if (FlowTransport::transport().isClient()) {
|
||||
wait(Never());
|
||||
}
|
||||
|
||||
return Never();
|
||||
// TODO: Re-enable centralized health monitoring.
|
||||
// state Reference<HealthMonitorClientState> hmState =
|
||||
// Reference<HealthMonitorClientState>(new HealthMonitorClientState());
|
||||
// loop {
|
||||
// state Future<Void> client =
|
||||
// ci->get().present() ? healthMonitorClientLoop(ci->get().get(), hmState) : Void();
|
||||
// wait(ci->onChange());
|
||||
// }
|
||||
}
|
|
@ -1,29 +0,0 @@
|
|||
/*
|
||||
* HealthMonitorClient.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBCLIENT_HEALTHMONITORCLIENT_H
|
||||
#define FDBCLIENT_HEALTHMONITORCLIENT_H
|
||||
#pragma once
|
||||
|
||||
#include "flow/flow.h"
|
||||
|
||||
Future<Void> healthMonitorClient(Reference<AsyncVar<Optional<struct ClusterInterface>>> const&);
|
||||
|
||||
#endif
|
|
@ -399,38 +399,32 @@ ACTOR Future<Void> connectionWriter( Reference<Peer> self, Reference<IConnection
|
|||
}
|
||||
|
||||
ACTOR Future<Void> delayedHealthUpdate(NetworkAddress address) {
|
||||
try {
|
||||
state double start = now();
|
||||
state int count = 0;
|
||||
loop {
|
||||
if (FLOW_KNOBS->HEALTH_MONITOR_MARK_FAILED_UNSTABLE_CONNECTIONS &&
|
||||
FlowTransport::transport().healthMonitor()->tooManyConnectionsClosed(address) && address.isPublic()) {
|
||||
if (count == 0) {
|
||||
TraceEvent("TooManyConnectionsClosedMarkFailed")
|
||||
.detail("Dest", address)
|
||||
.detail("StartTime", start)
|
||||
.detail("ClosedCount",
|
||||
FlowTransport::transport().healthMonitor()->closedConnectionsCount(address));
|
||||
IFailureMonitor::failureMonitor().setStatus(address, FailureStatus(true));
|
||||
}
|
||||
wait(delayJittered(FLOW_KNOBS->MAX_RECONNECTION_TIME * 2.0));
|
||||
} else {
|
||||
if (count > 1)
|
||||
TraceEvent("TooManyConnectionsClosedMarkAvailable")
|
||||
.detail("Dest", address)
|
||||
.detail("StartTime", start)
|
||||
.detail("TimeElapsed", now() - start)
|
||||
.detail("ClosedCount",
|
||||
FlowTransport::transport().healthMonitor()->closedConnectionsCount(address));
|
||||
IFailureMonitor::failureMonitor().setStatus(address, FailureStatus(false));
|
||||
break;
|
||||
state double start = now();
|
||||
state int count = 0;
|
||||
loop {
|
||||
if (FLOW_KNOBS->HEALTH_MONITOR_MARK_FAILED_UNSTABLE_CONNECTIONS &&
|
||||
FlowTransport::transport().healthMonitor()->tooManyConnectionsClosed(address) && address.isPublic()) {
|
||||
if (count == 0) {
|
||||
TraceEvent("TooManyConnectionsClosedMarkFailed")
|
||||
.detail("Dest", address)
|
||||
.detail("StartTime", start)
|
||||
.detail("ClosedCount", FlowTransport::transport().healthMonitor()->closedConnectionsCount(address));
|
||||
IFailureMonitor::failureMonitor().setStatus(address, FailureStatus(true));
|
||||
}
|
||||
++count;
|
||||
wait(delayJittered(FLOW_KNOBS->MAX_RECONNECTION_TIME * 2.0));
|
||||
} else {
|
||||
if (count > 1)
|
||||
TraceEvent("TooManyConnectionsClosedMarkAvailable")
|
||||
.detail("Dest", address)
|
||||
.detail("StartTime", start)
|
||||
.detail("TimeElapsed", now() - start)
|
||||
.detail("ClosedCount", FlowTransport::transport().healthMonitor()->closedConnectionsCount(address));
|
||||
IFailureMonitor::failureMonitor().setStatus(address, FailureStatus(false));
|
||||
break;
|
||||
}
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
throw e;
|
||||
++count;
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
|
||||
|
|
|
@ -1957,24 +1957,6 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData*
|
|||
}
|
||||
}
|
||||
|
||||
struct HealthMonitorServerState {
|
||||
std::map<NetworkAddress, std::map<NetworkAddress, bool>> failureState;
|
||||
};
|
||||
|
||||
ACTOR Future<Void> healthMonitoringServer(UID uniqueID, ClusterControllerData* self,
|
||||
FutureStream<HealthMonitoringRequest> requests) {
|
||||
state Version currentVersion = 0;
|
||||
|
||||
loop choose {
|
||||
when(HealthMonitoringRequest req = waitNext(requests)) {
|
||||
NetworkAddress reporteeAddress = req.reply.getEndpoint().getPrimaryAddress();
|
||||
TraceEvent("HealthMonitorRequestReceived")
|
||||
.detail("Size", req.closedPeers.size())
|
||||
.detail("EndpointAddress", reporteeAddress);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<vector<TLogInterface>> requireAll( vector<Future<Optional<vector<TLogInterface>>>> in ) {
|
||||
state vector<TLogInterface> out;
|
||||
state int i;
|
||||
|
@ -3070,7 +3052,6 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
state Future<ErrorOr<Void>> error = errorOr( actorCollection( self.addActor.getFuture() ) );
|
||||
|
||||
self.addActor.send( failureDetectionServer( self.id, &self, interf.clientInterface.failureMonitoring.getFuture() ) );
|
||||
self.addActor.send( healthMonitoringServer( self.id, &self, interf.clientInterface.healthMonitoring.getFuture() ) );
|
||||
self.addActor.send( clusterWatchDatabase( &self, &self.db ) ); // Start the master database
|
||||
self.addActor.send( self.updateWorkerList.init( self.db.db ) );
|
||||
self.addActor.send( statusServer( interf.clientInterface.databaseStatus.getFuture(), &self, coordinators));
|
||||
|
|
|
@ -25,7 +25,6 @@
|
|||
#include "flow/TDMetric.actor.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/HealthMonitorClient.h"
|
||||
#include "fdbclient/MetricLogger.h"
|
||||
#include "fdbserver/BackupInterface.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
|
@ -1600,7 +1599,6 @@ ACTOR Future<Void> fdbd(
|
|||
|
||||
actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo), "MonitorAndWriteCCPriorityInfo"));
|
||||
actors.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities ), "ClusterController") );
|
||||
actors.push_back( reportErrors(healthMonitorClient( ci ), "HealthMonitorClient") );
|
||||
actors.push_back( reportErrors(extractClusterInterface( cc, ci ), "ExtractClusterInterface") );
|
||||
actors.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncPriorityInfo, processClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix, recoveredDiskFiles, memoryProfileThreshold, coordFolder, whitelistBinPaths), "WorkerServer", UID(), &normalWorkerErrors()) );
|
||||
state Future<Void> firstConnect = reportErrors( printOnFirstConnected(ci), "ClusterFirstConnectedError" );
|
||||
|
|
Loading…
Reference in New Issue