Combined proxy health metrics replies into single message type

This commit is contained in:
Trevor Clinkenbeard 2019-02-23 10:13:43 -08:00
parent edc0c5bf2b
commit ff9a7cb2f1
3 changed files with 40 additions and 66 deletions

View File

@ -46,7 +46,6 @@ struct MasterProxyInterface {
RequestStream< struct TxnStateRequest > txnState;
RequestStream< struct GetHealthMetricsRequest > getHealthMetrics;
RequestStream< struct GetDetailedHealthMetricsRequest > getDetailedHealthMetrics;
UID id() const { return commit.getEndpoint().token; }
std::string toString() const { return id().shortString(); }
@ -58,7 +57,7 @@ struct MasterProxyInterface {
void serialize(Archive& ar) {
serializer(ar, locality, commit, getConsistentReadVersion, getKeyServersLocations,
waitFailure, getStorageServerRejoinInfo, getRawCommittedVersion,
txnState, getHealthMetrics, getDetailedHealthMetrics);
txnState, getHealthMetrics);
}
void initEndpoints() {
@ -238,66 +237,43 @@ struct TxnStateRequest {
struct GetHealthMetricsRequest
{
ReplyPromise<struct GetHealthMetricsReply> reply;
bool detailed;
GetHealthMetricsRequest(bool detailed = false) : detailed(detailed) {}
template <class Ar>
void serialize(Ar& ar)
{
serializer(ar, reply);
serializer(ar, reply, detailed);
}
};
struct GetHealthMetricsReply
{
Standalone<StringRef> serialized;
HealthMetrics healthMetrics;
template <class Ar>
void serialize(Ar& ar)
explicit GetHealthMetricsReply(HealthMetrics healthMetrics = HealthMetrics()) :
healthMetrics(healthMetrics)
{
serializer(ar, healthMetrics);
update(healthMetrics, true, true);
}
};
struct GetDetailedHealthMetricsRequest
{
ReplyPromise<struct GetDetailedHealthMetricsReply> reply;
template <class Ar>
void serialize(Ar& ar)
{
serializer(ar, reply);
}
};
struct GetDetailedHealthMetricsReply
{
Standalone<StringRef> serialized;
GetDetailedHealthMetricsReply() {}
void setHealthMetrics(const HealthMetrics& healthMetrics)
void update(HealthMetrics healthMetrics, bool detailedInput, bool detailedOutput)
{
this->healthMetrics.update(healthMetrics, detailedInput, detailedOutput);
BinaryWriter bw(IncludeVersion());
bw << healthMetrics;
bw << this->healthMetrics;
serialized = Standalone<StringRef>(bw.toStringRef());
}
HealthMetrics getHealthMetrics()
{
if (serialized.size() > 0) {
BinaryReader br(serialized, IncludeVersion());
HealthMetrics result;
br >> result;
return result;
}
else
return HealthMetrics();
}
template <class Ar>
void serialize(Ar& ar)
{
void serialize(Ar& ar) {
serializer(ar, serialized);
if (ar.isDeserializing) {
BinaryReader br(serialized, IncludeVersion());
br >> healthMetrics;
}
}
};

View File

@ -505,7 +505,7 @@ ACTOR static Future<Void> updateHealthMetricsActor(DatabaseContext *cx) {
when(state GetHealthMetricsReply rep =
wait(loadBalance(cx->getMasterProxies(),
&MasterProxyInterface::getHealthMetrics,
GetHealthMetricsRequest()))) {
GetHealthMetricsRequest(false)))) {
cx->healthMetrics.update(rep.healthMetrics, false, true);
break;
}
@ -518,11 +518,11 @@ ACTOR static Future<Void> updateHealthMetricsActor(DatabaseContext *cx) {
loop {
choose {
when(wait(cx->onMasterProxiesChanged())) {}
when(state GetDetailedHealthMetricsReply detailedRep =
when(state GetHealthMetricsReply detailedRep =
wait(loadBalance(cx->getMasterProxies(),
&MasterProxyInterface::getDetailedHealthMetrics,
GetDetailedHealthMetricsRequest()))) {
cx->healthMetrics.update(detailedRep.getHealthMetrics(), true, true);
&MasterProxyInterface::getHealthMetrics,
GetHealthMetricsRequest(true)))) {
cx->healthMetrics.update(detailedRep.healthMetrics, true, true);
detailedTimer = delay(
CLIENT_KNOBS->UPDATE_DETAILED_HEALTH_METRICS_INTERVAL);
break;

View File

@ -87,8 +87,8 @@ Future<Void> forwardValue(Promise<T> out, Future<T> in)
int getBytes(Promise<Version> const& r) { return 0; }
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, double* outTransactionRate, HealthMetrics* healthMetrics,
GetDetailedHealthMetricsReply* getDetailedHealthMetricsReply) {
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, double* outTransactionRate, GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> nextDetailedRequestTimer = Never();
state Future<Void> leaseTimeout = Never();
@ -123,16 +123,18 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
}
when ( GetRateInfoReply rep = wait(reply) ) {
reply = Never();
healthMetrics->update(rep.healthMetrics, rep.detailed, true);
*outTransactionRate = rep.transactionRate;
// TraceEvent("MasterProxyRate", myID).detail("Rate", rep.transactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC);
lastTC = *inTransactionCount;
leaseTimeout = delay(rep.leaseDuration);
nextRequestTimer = delayJittered(rep.leaseDuration / 2);
if (rep.detailed) {
detailedHealthMetricsReply->update(rep.healthMetrics, true, true);
detailedLeaseTimeout = delay(SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE);
nextDetailedRequestTimer = delayJittered(SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE / 2);
getDetailedHealthMetricsReply->setHealthMetrics(*healthMetrics);
}
else {
healthMetricsReply->update(rep.healthMetrics, false, true);
}
}
when ( wait(leaseTimeout ) ) {
@ -1115,8 +1117,8 @@ ACTOR static Future<Void> transactionStarter(
MasterProxyInterface proxy,
Reference<AsyncVar<ServerDBInfo>> db,
PromiseStream<Future<Void>> addActor,
ProxyCommitData* commitData, HealthMetrics* healthMetrics,
GetDetailedHealthMetricsReply* getDetailedHealthMetricsReply)
ProxyCommitData* commitData, GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply)
{
state double lastGRVTime = 0;
state PromiseStream<Void> GRVTimer;
@ -1129,7 +1131,7 @@ ACTOR static Future<Void> transactionStarter(
state vector<MasterProxyInterface> otherProxies;
state PromiseStream<double> replyTimes;
addActor.send(getRate(proxy.id(), db, &transactionCount, &transactionRate, healthMetrics, getDetailedHealthMetricsReply));
addActor.send(getRate(proxy.id(), db, &transactionCount, &transactionRate, healthMetricsReply, detailedHealthMetricsReply));
addActor.send(queueTransactionStartRequests(&transactionQueue, proxy.getConsistentReadVersion.getFuture(), GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats));
// Get a list of the other proxies that go together with us
@ -1338,21 +1340,17 @@ ACTOR static Future<Void> readRequestServer(
}
}
ACTOR Future<Void> healthMetricsRequestServer(MasterProxyInterface proxy, HealthMetrics* healthMetrics, GetDetailedHealthMetricsReply* getDetailedHealthMetricsReply)
ACTOR Future<Void> healthMetricsRequestServer(MasterProxyInterface proxy, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply)
{
loop {
choose {
when(GetHealthMetricsRequest req =
waitNext(proxy.getHealthMetrics.getFuture()))
{
GetHealthMetricsReply rep;
rep.healthMetrics.update(*healthMetrics, true, false);
req.reply.send(rep);
}
when(GetDetailedHealthMetricsRequest req =
waitNext(proxy.getDetailedHealthMetrics.getFuture()))
{
req.reply.send(*getDetailedHealthMetricsReply);
if (req.detailed)
req.reply.send(*detailedHealthMetricsReply);
else
req.reply.send(*healthMetricsReply);
}
}
}
@ -1436,8 +1434,8 @@ ACTOR Future<Void> masterProxyServerCore(
state std::set<Sequence> txnSequences;
state Sequence maxSequence = std::numeric_limits<Sequence>::max();
state HealthMetrics healthMetrics;
state GetDetailedHealthMetricsReply getDetailedHealthMetricsReply;
state GetHealthMetricsReply healthMetricsReply;
state GetHealthMetricsReply detailedHealthMetricsReply;
addActor.send( fetchVersions(&commitData) );
addActor.send( waitFailureServer(proxy.waitFailure.getFuture()) );
@ -1469,9 +1467,9 @@ ACTOR Future<Void> masterProxyServerCore(
TraceEvent(SevInfo, "CommitBatchesMemoryLimit").detail("BytesLimit", commitBatchesMemoryLimit);
addActor.send(monitorRemoteCommitted(&commitData, db));
addActor.send(transactionStarter(proxy, db, addActor, &commitData, &healthMetrics, &getDetailedHealthMetricsReply));
addActor.send(transactionStarter(proxy, db, addActor, &commitData, &healthMetricsReply, &detailedHealthMetricsReply));
addActor.send(readRequestServer(proxy, &commitData));
addActor.send(healthMetricsRequestServer(proxy, &healthMetrics, &getDetailedHealthMetricsReply));
addActor.send(healthMetricsRequestServer(proxy, &healthMetricsReply, &detailedHealthMetricsReply));
// wait for txnStateStore recovery
wait(success(commitData.txnStateStore->readValue(StringRef())));