Don't block the data distributor when getting a GetDataDistributorMetricsRequest.

This commit is contained in:
A.J. Beamon 2020-08-21 09:26:18 -07:00
parent 8f9ae86212
commit 6380b92b10
1 changed files with 17 additions and 10 deletions

View File

@ -4817,6 +4817,21 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
return Void(); return Void();
} }
ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req, PromiseStream<GetMetricsListRequest> getShardMetricsList) {
ErrorOr<Standalone<VectorRef<DDMetricsRef>>> result = wait(errorOr(brokenPromiseToNever(
getShardMetricsList.getReply(GetMetricsListRequest(req.keys, req.shardLimit)))));
if(result.isError()) {
req.reply.sendError(result.getError());
} else {
GetDataDistributorMetricsReply rep;
rep.storageMetricsList = result.get();
req.reply.send(rep);
}
return Void();
}
ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq, Reference<AsyncVar<struct ServerDBInfo>> db ) { ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq, Reference<AsyncVar<struct ServerDBInfo>> db ) {
state Future<Void> dbInfoChange = db->onChange(); state Future<Void> dbInfoChange = db->onChange();
if (!setDDEnabled(false, snapReq.snapUID)) { if (!setDDEnabled(false, snapReq.snapUID)) {
@ -4940,16 +4955,8 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
TraceEvent("DataDistributorHalted", di.id()).detail("ReqID", req.requesterID); TraceEvent("DataDistributorHalted", di.id()).detail("ReqID", req.requesterID);
break; break;
} }
when ( state GetDataDistributorMetricsRequest req = waitNext(di.dataDistributorMetrics.getFuture()) ) { when(GetDataDistributorMetricsRequest req = waitNext(di.dataDistributorMetrics.getFuture())) {
ErrorOr<Standalone<VectorRef<DDMetricsRef>>> result = wait(errorOr(brokenPromiseToNever( actors.add(ddGetMetrics(req, getShardMetricsList));
getShardMetricsList.getReply(GetMetricsListRequest(req.keys, req.shardLimit)))));
if ( result.isError() ) {
req.reply.sendError(result.getError());
} else {
GetDataDistributorMetricsReply rep;
rep.storageMetricsList = result.get();
req.reply.send(rep);
}
} }
when(DistributorSnapRequest snapReq = waitNext(di.distributorSnapReq.getFuture())) { when(DistributorSnapRequest snapReq = waitNext(di.distributorSnapReq.getFuture())) {
actors.add(ddSnapCreate(snapReq, db)); actors.add(ddSnapCreate(snapReq, db));