Proxy runs healthMetricsRequestServer to handle incoming health metrics requests

This commit is contained in:
Trevor Clinkenbeard 2019-02-01 10:58:42 -08:00
parent e549def5a9
commit 4daf49ff4d
2 changed files with 27 additions and 1 deletions

View File

@ -42,6 +42,9 @@ struct MasterProxyInterface {
RequestStream< struct GetRawCommittedVersionRequest > getRawCommittedVersion;
RequestStream< struct TxnStateRequest > txnState;
RequestStream< struct GetHealthMetricsRequest > getHealthMetrics;
RequestStream< struct GetDetailedHealthMetricsRequest > getDetailedHealthMetrics;
UID id() const { return commit.getEndpoint().token; }
std::string toString() const { return id().shortString(); }
bool operator == (MasterProxyInterface const& r) const { return id() == r.id(); }
@ -50,7 +53,9 @@ struct MasterProxyInterface {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, locality, commit, getConsistentReadVersion, getKeyServersLocations, waitFailure, getStorageServerRejoinInfo, getRawCommittedVersion, txnState);
serializer(ar, locality, commit, getConsistentReadVersion, getKeyServersLocations,
waitFailure, getStorageServerRejoinInfo, getRawCommittedVersion,
txnState, getHealthMetrics, getDetailedHealthMetrics);
}
void initEndpoints() {

View File

@ -1293,6 +1293,26 @@ ACTOR static Future<Void> readRequestServer(
}
}
ACTOR Future<Void> healthMetricsRequestServer(MasterProxyInterface proxy, HealthMetrics* healthMetrics, GetDetailedHealthMetricsReply* getDetailedHealthMetricsReply)
{
loop {
choose {
when(GetHealthMetricsRequest req =
waitNext(proxy.getHealthMetrics.getFuture()))
{
GetHealthMetricsReply rep;
rep.healthMetrics.update(*healthMetrics, true, false);
req.reply.send(rep);
}
when(GetDetailedHealthMetricsRequest req =
waitNext(proxy.getDetailedHealthMetrics.getFuture()))
{
req.reply.send(*getDetailedHealthMetricsReply);
}
}
}
}
ACTOR Future<Void> monitorRemoteCommitted(ProxyCommitData* self, Reference<AsyncVar<ServerDBInfo>> db) {
loop {
wait(delay(0)); //allow this actor to be cancelled if we are removed after db changes.
@ -1406,6 +1426,7 @@ ACTOR Future<Void> masterProxyServerCore(
addActor.send(monitorRemoteCommitted(&commitData, db));
addActor.send(transactionStarter(proxy, master, db, addActor, &commitData, &healthMetrics, &getDetailedHealthMetricsReply));
addActor.send(readRequestServer(proxy, &commitData));
addActor.send(healthMetricsRequestServer(proxy, &healthMetrics, &getDetailedHealthMetricsReply));
// wait for txnStateStore recovery
Optional<Value> _ = wait(commitData.txnStateStore->readValue(StringRef()));