Add HealthMonitoring skeleton code

This commit is contained in:
Vishesh Yadav 2020-02-17 21:54:05 -08:00
parent 04f925f770
commit d90e168e24
14 changed files with 302 additions and 2 deletions

View File

@ -21,6 +21,8 @@ set(FDBCLIENT_SRCS
FDBOptions.h
FDBTypes.h
FileBackupAgent.actor.cpp
HealthMonitorClient.h
HealthMonitorClient.actor.cpp
HTTP.actor.cpp
IClientApi.h
JsonBuilder.cpp

View File

@ -36,6 +36,7 @@ struct ClusterInterface {
RequestStream< ReplyPromise<Void> > ping;
RequestStream< struct GetClientWorkersRequest > getClientWorkers;
RequestStream< struct ForceRecoveryRequest > forceRecovery;
RequestStream< struct HealthMonitoringRequest > healthMonitoring;
bool operator == (ClusterInterface const& r) const { return id() == r.id(); }
bool operator != (ClusterInterface const& r) const { return id() != r.id(); }
@ -48,7 +49,8 @@ struct ClusterInterface {
databaseStatus.getFuture().isReady() ||
ping.getFuture().isReady() ||
getClientWorkers.getFuture().isReady() ||
forceRecovery.getFuture().isReady();
forceRecovery.getFuture().isReady() ||
healthMonitoring.getFuture().isReady();
}
void initEndpoints() {
@ -58,11 +60,13 @@ struct ClusterInterface {
ping.getEndpoint( TaskPriority::ClusterController );
getClientWorkers.getEndpoint( TaskPriority::ClusterController );
forceRecovery.getEndpoint( TaskPriority::ClusterController );
healthMonitoring.getEndpoint( TaskPriority::FailureMonitor );
}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, openDatabase, failureMonitoring, databaseStatus, ping, getClientWorkers, forceRecovery);
serializer(ar, openDatabase, failureMonitoring, databaseStatus, ping, getClientWorkers, forceRecovery,
healthMonitoring);
}
};
@ -230,6 +234,31 @@ struct FailureMonitoringRequest {
}
};
struct HealthMonitoringReply {
constexpr static FileIdentifier file_identifier = 6820326;
Version healthInformationVersion;
Arena arena;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, healthInformationVersion, arena);
}
};
struct HealthMonitoringRequest {
constexpr static FileIdentifier file_identifier = 5867852;
Version healthInformationVersion;
int lastRequestElapsed;
std::map<NetworkAddress, int> closedPeers;
std::map<NetworkAddress, bool> peerStatus;
ReplyPromise<struct HealthMonitoringReply> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, lastRequestElapsed, healthInformationVersion, closedPeers, peerStatus, reply);
}
};
struct StatusReply {
constexpr static FileIdentifier file_identifier = 9980504;
StatusObject statusObj;

View File

@ -0,0 +1,109 @@
/*
* HealthMonitorClient.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/HealthMonitorClient.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbclient/ClusterInterface.h"
#include "flow/actorcompiler.h" // has to be last include
#include <unordered_set>
struct HealthMonitorClientState : ReferenceCounted<HealthMonitorClientState> {
HealthMonitorClientState() { }
};
ACTOR Future<Void> healthMonitorClientLoop(ClusterInterface controller, Reference<HealthMonitorClientState> hmState) {
state Version version = 0;
state Future<HealthMonitoringReply> request = Never();
state Future<Void> nextRequest = delay(0, TaskPriority::FailureMonitor);
state Future<Void> requestTimeout = Never();
state double before = now();
state double waitfor = 0;
state int CLIENT_REQUEST_INTERVAL_SECS = 10;
state int CLIENT_REQUEST_FAILED_TIMEOUT_SECS = 2;
try {
loop {
choose {
when(HealthMonitoringReply reply = wait(request)) {
g_network->setCurrentTask(TaskPriority::DefaultDelay);
request = Never();
requestTimeout = Never();
version = reply.healthInformationVersion;
before = now();
waitfor = CLIENT_REQUEST_INTERVAL_SECS;
nextRequest = delayJittered(waitfor, TaskPriority::FailureMonitor);
}
when(wait(requestTimeout)) {
g_network->setCurrentTask(TaskPriority::DefaultDelay);
requestTimeout = Never();
TraceEvent(SevWarn, "HealthMonitoringServerDown").detail("OldServerID", controller.id());
}
when(wait(nextRequest)) {
g_network->setCurrentTask(TaskPriority::DefaultDelay);
nextRequest = Never();
double elapsed = now() - before;
double slowThreshold = .200 + waitfor + FLOW_KNOBS->MAX_BUGGIFIED_DELAY;
double warnAlwaysThreshold = CLIENT_KNOBS->FAILURE_MIN_DELAY / 2;
if (elapsed > slowThreshold && deterministicRandom()->random01() < elapsed / warnAlwaysThreshold) {
TraceEvent(elapsed > warnAlwaysThreshold ? SevWarnAlways : SevWarn, "HealthMonitorClientSlow")
.detail("Elapsed", elapsed)
.detail("Expected", waitfor);
}
std::map<NetworkAddress, int> closedPeers;
for (const auto& entry : FlowTransport::transport().healthMonitor()->getPeerClosedHistory()) {
closedPeers[entry.second] += 1;
}
HealthMonitoringRequest req;
req.healthInformationVersion = version;
req.closedPeers = closedPeers;
req.peerStatus = FlowTransport::transport().healthMonitor()->getPeerStatus();
request = controller.healthMonitoring.getReply(req, TaskPriority::FailureMonitor);
if (!controller.healthMonitoring.getEndpoint().isLocal())
requestTimeout = delay(CLIENT_REQUEST_FAILED_TIMEOUT_SECS, TaskPriority::FailureMonitor);
}
}
}
} catch (Error& e) {
if (e.code() == error_code_broken_promise) // broken promise from clustercontroller means it has died (and
// hopefully will be replaced)
return Void();
TraceEvent(SevError, "HealthMonitorClientError").error(e);
throw; // goes nowhere
}
}
ACTOR Future<Void> healthMonitorClient(Reference<AsyncVar<Optional<struct ClusterInterface>>> ci) {
TraceEvent("HealthMonitorStart").detail("IsClient", FlowTransport::transport().isClient());
if (FlowTransport::transport().isClient()) {
wait(Never());
}
state Reference<HealthMonitorClientState> hmState = Reference<HealthMonitorClientState>(new HealthMonitorClientState());
loop {
state Future<Void> client =
ci->get().present() ? healthMonitorClientLoop(ci->get().get(), hmState) : Void();
wait(ci->onChange());
}
}

View File

@ -0,0 +1,29 @@
/*
* HealthMonitorClient.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_HEALTHMONITORCLIENT_H
#define FDBCLIENT_HEALTHMONITORCLIENT_H
#pragma once
#include "flow/flow.h"
Future<Void> healthMonitorClient(Reference<AsyncVar<Optional<struct ClusterInterface>>> const&);
#endif

View File

@ -48,6 +48,7 @@
<ClInclude Include="HTTP.h" />
<ClInclude Include="KeyBackedTypes.h" />
<ClInclude Include="MetricLogger.h" />
<ClInclude Include="HealthMonitorClient.h" />
<ClInclude Include="IClientApi.h" />
<ClInclude Include="JsonBuilder.h" />
<ClInclude Include="JSONDoc.h" />
@ -110,6 +111,7 @@
<ActorCompiler Include="BlobStore.actor.cpp" />
<ActorCompiler Include="DatabaseBackupAgent.actor.cpp" />
<ClCompile Include="DatabaseConfiguration.cpp" />
<ClCompile Include="HealthMonitorClient.actor.cpp" />
<ClCompile Include="FDBOptions.g.cpp" />
<ActorCompiler Include="FileBackupAgent.actor.cpp" />
<ActorCompiler Include="HTTP.actor.cpp" />

View File

@ -13,6 +13,7 @@ set(FDBRPC_SRCS
FlowTransport.actor.cpp
genericactors.actor.h
genericactors.actor.cpp
HealthMonitor.actor.cpp
IAsyncFile.actor.cpp
LoadBalance.actor.h
Locality.cpp

View File

@ -28,6 +28,7 @@
#include "flow/crc32c.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/HealthMonitor.h"
#include "fdbrpc/genericactors.actor.h"
#include "fdbrpc/simulator.h"
#include "flow/ActorCollection.h"
@ -189,6 +190,7 @@ public:
std::vector<Future<Void>> listeners;
std::unordered_map<NetworkAddress, Reference<struct Peer>> peers;
std::unordered_map<NetworkAddress, std::pair<double, double>> closedPeers;
HealthMonitor healthMonitor;
Reference<AsyncVar<bool>> degraded;
bool warnAlwaysForLargePacket;
@ -535,6 +537,10 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
it.second = now();
}
if (self->destination.isPublic() && !FlowTransport::transport().isClient()) {
FlowTransport::transport().healthMonitor()->reportPeerClosed(self->destination);
}
if (conn) {
conn->close();
conn = Reference<IConnection>();
@ -1316,3 +1322,15 @@ void FlowTransport::createInstance(bool isClient, uint64_t transportId) {
g_network->setGlobal(INetwork::enNetworkAddressFunc, (flowGlobalType) &FlowTransport::getGlobalLocalAddress);
g_network->setGlobal(INetwork::enNetworkAddressesFunc, (flowGlobalType) &FlowTransport::getGlobalLocalAddresses);
}
HealthMonitor* FlowTransport::healthMonitor() {
return &self->healthMonitor;
}
std::set<NetworkAddress> FlowTransport::getPeers() const {
std::set<NetworkAddress> result;
for (const auto& it : self->peers) {
result.insert(it.first);
}
return result;
}

View File

@ -23,6 +23,7 @@
#pragma once
#include <algorithm>
#include "fdbrpc/HealthMonitor.h"
#include "flow/genericactors.actor.h"
#include "flow/network.h"
#include "flow/FileIdentifier.h"
@ -205,6 +206,9 @@ public:
Endpoint loadedEndpoint(const UID& token);
HealthMonitor* healthMonitor();
std::set<NetworkAddress> getPeers() const;
private:
class TransportData* self;
};

View File

@ -0,0 +1,48 @@
/*
* HealthMonitor.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/FlowTransport.h"
#include "fdbrpc/HealthMonitor.h"
const int CLIENT_REQUEST_INTERVAL_SECS = 10; // TODO (Vishesh) Make a Knob
void HealthMonitor::reportPeerClosed(const NetworkAddress& peerAddress) {
peerClosedHistory.push_back(std::make_pair(now(), peerAddress));
}
const std::deque<std::pair<double, NetworkAddress>>& HealthMonitor::getPeerClosedHistory() {
for (auto it : peerClosedHistory) {
if (it.first < now() - CLIENT_REQUEST_INTERVAL_SECS) {
peerClosedHistory.pop_front();
} else {
break;
}
}
return peerClosedHistory;
}
std::map<NetworkAddress, bool> HealthMonitor::getPeerStatus() const {
std::map<NetworkAddress, bool> result;
for (const auto& peer : FlowTransport::transport().getPeers()) {
result[peer] = IFailureMonitor::failureMonitor().getState(peer).isAvailable();
}
return result;
}

41
fdbrpc/HealthMonitor.h Normal file
View File

@ -0,0 +1,41 @@
/*
* HealthMonitor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBRPC_HEALTH_MONITOR_H
#define FDBRPC_HEALTH_MONITOR_H
#include <deque>
#include <unordered_map>
#include <unordered_set>
#include <flow/flow.h>
class HealthMonitor {
public:
void reportPeerClosed(const NetworkAddress& peerAddress);
const std::deque<std::pair<double, NetworkAddress>>& getPeerClosedHistory();
std::map<NetworkAddress, bool> getPeerStatus() const;
private:
std::deque<std::pair<double, NetworkAddress>> peerClosedHistory;
};
#endif // FDBRPC_HEALTH_MONITOR_H

View File

@ -19,6 +19,7 @@
<ActorCompiler Include="FlowTests.actor.cpp" />
<ActorCompiler Include="genericactors.actor.cpp" />
<ActorCompiler Include="FlowTransport.actor.cpp" />
<ActorCompiler Include="HealthMonitor.actor.cpp" />
<ActorCompiler Include="IAsyncFile.actor.cpp" />
<ClCompile Include="AsyncFileWriteChecker.cpp" />
<ClCompile Include="libcoroutine\Common.c" />
@ -70,6 +71,7 @@
<ClInclude Include="AsyncFileWriteChecker.h" />
<ClInclude Include="ContinuousSample.h" />
<ClInclude Include="FailureMonitor.h" />
<ClInclude Include="HealthMonitor.h" />
<ActorCompiler Include="LoadBalance.actor.h">
<EnableCompile>false</EnableCompile>
</ActorCompiler>

View File

@ -16,6 +16,7 @@
<ActorCompiler Include="genericactors.actor.h" />
<ActorCompiler Include="FailureMonitor.actor.cpp" />
<ActorCompiler Include="FlowTransport.actor.cpp" />
<ActorCompiler Include="HealthMonitor.actor.cpp" />
<ActorCompiler Include="ActorFuzz.actor.cpp" />
<ActorCompiler Include="AsyncFileReadAhead.actor.h" />
<ActorCompiler Include="IAsyncFile.actor.cpp" />

View File

@ -1957,6 +1957,17 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData*
}
}
ACTOR Future<Void> healthMonitoringServer(UID uniqueID, ClusterControllerData* self,
FutureStream<HealthMonitoringRequest> requests) {
state Version currentVersion = 0;
loop choose {
when(HealthMonitoringRequest req = waitNext(requests)) {
TraceEvent("HealthMonitorRequestReceived");
}
}
}
ACTOR Future<vector<TLogInterface>> requireAll( vector<Future<Optional<vector<TLogInterface>>>> in ) {
state vector<TLogInterface> out;
state int i;
@ -3052,6 +3063,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
state Future<ErrorOr<Void>> error = errorOr( actorCollection( self.addActor.getFuture() ) );
self.addActor.send( failureDetectionServer( self.id, &self, interf.clientInterface.failureMonitoring.getFuture() ) );
self.addActor.send( healthMonitoringServer( self.id, &self, interf.clientInterface.healthMonitoring.getFuture() ) );
self.addActor.send( clusterWatchDatabase( &self, &self.db ) ); // Start the master database
self.addActor.send( self.updateWorkerList.init( self.db.db ) );
self.addActor.send( statusServer( interf.clientInterface.databaseStatus.getFuture(), &self, coordinators));

View File

@ -25,6 +25,7 @@
#include "flow/TDMetric.actor.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/HealthMonitorClient.h"
#include "fdbclient/MetricLogger.h"
#include "fdbserver/BackupInterface.h"
#include "fdbserver/WorkerInterface.actor.h"
@ -1599,6 +1600,7 @@ ACTOR Future<Void> fdbd(
actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo), "MonitorAndWriteCCPriorityInfo"));
actors.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities ), "ClusterController") );
actors.push_back( reportErrors(healthMonitorClient( ci ), "HealthMonitorClient") );
actors.push_back( reportErrors(extractClusterInterface( cc, ci ), "ExtractClusterInterface") );
actors.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncPriorityInfo, processClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix, recoveredDiskFiles, memoryProfileThreshold, coordFolder, whitelistBinPaths), "WorkerServer", UID(), &normalWorkerErrors()) );
state Future<Void> firstConnect = reportErrors( printOnFirstConnected(ci), "ClusterFirstConnectedError" );