update DDMetrics periodically
This commit is contained in:
parent
d1cc87452c
commit
6db7040f17
|
@ -340,6 +340,7 @@ public:
|
|||
double detailedHealthMetricsLastUpdated;
|
||||
Smoother smoothAvgShardSize;
|
||||
int avgShardSizeLastUpdated;
|
||||
Optional<Future<Void>> avgShardSizeUpdater;
|
||||
|
||||
UniqueOrderedOptionList<FDBTransactionOptions> transactionDefaults;
|
||||
|
||||
|
|
|
@ -3303,6 +3303,9 @@ void Transaction::setupWatches() {
|
|||
|
||||
ACTOR Future<Optional<ClientTrCommitCostEstimation>> estimateCommitCosts(Transaction* self,
|
||||
CommitTransactionRef* transaction) {
|
||||
if(!self->getDatabase()->avgShardSizeUpdater.present()){ // lazy start: only client who needs tag pay this overload
|
||||
self->getDatabase()->avgShardSizeUpdater = monitorDDMetricsChanges(self->getDatabase());
|
||||
}
|
||||
state ClientTrCommitCostEstimation trCommitCosts;
|
||||
state KeyRange keyRange;
|
||||
state int i = 0;
|
||||
|
@ -4372,6 +4375,36 @@ ACTOR Future<Standalone<VectorRef<DDMetricsRef>>> waitDataDistributionMetricsLis
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> monitorDDMetricsChanges(Database cx) {
|
||||
state bool isFirstRep = true;
|
||||
state ErrorOr<GetDDMetricsReply> rep;
|
||||
loop {
|
||||
double elapsed = now() - cx->avgShardSizeLastUpdated;
|
||||
if (elapsed < CLIENT_KNOBS->AVG_SHARD_SIZE_MAX_STALENESS)
|
||||
wait(delay(CLIENT_KNOBS->AVG_SHARD_SIZE_MAX_STALENESS - elapsed));
|
||||
|
||||
wait(store(rep,errorOr(basicLoadBalance(
|
||||
cx->getMasterProxies(false), &MasterProxyInterface::getDDMetrics,
|
||||
GetDDMetricsRequest(normalKeys, std::numeric_limits<int>::max(), true)))));
|
||||
if(rep.isError()) {
|
||||
TraceEvent(SevWarn,"NAPIMonitorDDMetricsChangeError").error(rep.getError());
|
||||
break; // Handle error
|
||||
}
|
||||
ASSERT(rep.get().avgShardSize.present());
|
||||
double avgShardSize = rep.get().avgShardSize.get();
|
||||
if(avgShardSize > 0) {
|
||||
if (isFirstRep) {
|
||||
cx->smoothAvgShardSize.reset(avgShardSize);
|
||||
isFirstRep = false;
|
||||
} else cx->smoothAvgShardSize.setTotal(avgShardSize);
|
||||
|
||||
cx->avgShardSizeLastUpdated = now();
|
||||
}
|
||||
if(deterministicRandom()->random01() < 0.01)
|
||||
TraceEvent("NAPIMeanShardSizex100").detail("SmoothTotal", cx->smoothAvgShardSize.smoothTotal());
|
||||
};
|
||||
}
|
||||
|
||||
Future<Standalone<VectorRef<KeyRangeRef>>> Transaction::getReadHotRanges(KeyRange const& keys) {
|
||||
return ::getReadHotRanges(cx, keys);
|
||||
}
|
||||
|
|
|
@ -345,6 +345,7 @@ private:
|
|||
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanID spanContext);
|
||||
ACTOR Future<Standalone<VectorRef<DDMetricsRef>>> waitDataDistributionMetricsList(Database cx, KeyRange keys,
|
||||
int shardLimit);
|
||||
ACTOR Future<Void> monitorDDMetricsChanges(Database cx);
|
||||
|
||||
std::string unprintable( const std::string& );
|
||||
|
||||
|
|
|
@ -1898,6 +1898,11 @@ ACTOR Future<Void> ddMetricsRequestServer(MasterProxyInterface proxy, Reference<
|
|||
choose {
|
||||
when(state GetDDMetricsRequest req = waitNext(proxy.getDDMetrics.getFuture()))
|
||||
{
|
||||
// TraceEvent("DDMetricsRequestServer").detail("HasDistributor", db->get().distributor.present());
|
||||
if(!db->get().distributor.present()) {
|
||||
req.reply.sendError(dd_cancelled());
|
||||
continue;
|
||||
}
|
||||
ErrorOr<GetDataDistributorMetricsReply> reply =
|
||||
wait(errorOr(db->get().distributor.get().dataDistributorMetrics.getReply(
|
||||
GetDataDistributorMetricsRequest(req.keys, req.shardLimit, req.avgOnly))));
|
||||
|
|
|
@ -890,32 +890,6 @@ Future<Void> refreshStorageServerCommitCost(RatekeeperData *self) {
|
|||
self->lastBusiestCommitTagPick = now();
|
||||
return Void();
|
||||
}
|
||||
//ACTOR Future<Void> monitorDDMetricsChanges(RatekeeperData *self, Reference<AsyncVar<ServerDBInfo>> db) {
|
||||
// state bool isFirstRep = true;
|
||||
// loop {
|
||||
// wait(delay(SERVER_KNOBS->DD_ENABLED_CHECK_DELAY));
|
||||
// if(!db->get().distributor.present()) continue;
|
||||
//
|
||||
// ErrorOr<GetDataDistributorMetricsReply> reply = wait(errorOr(
|
||||
// db->get().distributor.get().dataDistributorMetrics.getReply( GetDataDistributorMetricsRequest(normalKeys, std::numeric_limits<int>::max(), true) ) ) );
|
||||
//
|
||||
// if(reply.isError()) {
|
||||
// TraceEvent(SevWarn,"RkMonitorDDMetricsChangeError").error(reply.getError());
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// ASSERT(reply.get().avgShardSize.present());
|
||||
// double meanShardSize = reply.get().avgShardSize.get();
|
||||
// if(meanShardSize > 0) {
|
||||
// if (isFirstRep) {
|
||||
// self->smoothMeanShardSize.reset(meanShardSize);
|
||||
// isFirstRep = false;
|
||||
// } else self->smoothMeanShardSize.setTotal(meanShardSize);
|
||||
// }
|
||||
// if(deterministicRandom()->random01() < 0.01)
|
||||
// TraceEvent("RkMeanShardSizex100").detail("SmoothTotal", self->smoothMeanShardSize.smoothTotal());
|
||||
// };
|
||||
//}
|
||||
|
||||
void tryAutoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss) {
|
||||
if(ss.busiestReadTag.present() && ss.busiestReadTagFractionalBusyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && ss.busiestReadTagRate > SERVER_KNOBS->MIN_TAG_COST) {
|
||||
|
|
Loading…
Reference in New Issue