diff --git a/fdbserver/DataDistribution.actor.h b/fdbserver/DataDistribution.actor.h index 40b50bb824..004d4424d2 100644 --- a/fdbserver/DataDistribution.actor.h +++ b/fdbserver/DataDistribution.actor.h @@ -164,13 +164,20 @@ struct GetMetricsRequest { GetMetricsRequest(KeyRange const& keys) : keys(keys) {} }; +struct GetTopKMetricsReply { + std::vector metrics; + double minReadLoad = -1, maxReadLoad = -1; + GetTopKMetricsReply() {} + GetTopKMetricsReply(std::vector const& m, double minReadLoad, double maxReadLoad) + : metrics(m), minReadLoad(minReadLoad), maxReadLoad(maxReadLoad) {} +}; struct GetTopKMetricsRequest { // whether a > b typedef std::function MetricsComparator; int topK = 1; // default only return the top 1 shard based on the comparator MetricsComparator comparator; // Return true if a.score > b.score, return the largest topK in keys std::vector keys; - Promise> reply; // topK storage metrics + Promise reply; // topK storage metrics double maxBytesReadPerKSecond = 0; // all returned shards won't exceed this read load GetTopKMetricsRequest() {} diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index 03c25feb7c..c9d270ef18 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -1554,7 +1554,7 @@ ACTOR Future rebalanceReadLoad(DDQueueData* self, return a.bytesReadPerKSecond / std::max(a.bytes * 1.0, 1.0 * SERVER_KNOBS->MIN_SHARD_BYTES) > b.bytesReadPerKSecond / std::max(b.bytes * 1.0, 1.0 * SERVER_KNOBS->MIN_SHARD_BYTES); }; - state std::vector metricsList = wait(brokenPromiseToNever(self->getTopKMetrics.getReply(req))); + state GetTopKMetricsReply reply = wait(brokenPromiseToNever(self->getTopKMetrics.getReply(req))); wait(ready(healthMetrics)); auto cpu = getWorstCpu(healthMetrics.get(), sourceTeam->getServerIDs()); if (cpu < SERVER_KNOBS->READ_REBALANCE_CPU_THRESHOLD) { // 15.0 +- (0.3 * 15) < 20.0 @@ -1562,10 +1562,10 @@ ACTOR Future rebalanceReadLoad(DDQueueData* self, return false; } - if (!metricsList.empty()) { - traceEvent->detail("KthReadLoad1", metricsList[metricsList.size() - 1].bytesReadPerKSecond) - .detail("KthReadLoad2", metricsList[0].bytesReadPerKSecond); - } + auto& metricsList = reply.metrics; + // NOTE: randomize is important here since we don't want to always push the same shard into the queue + deterministicRandom()->randomShuffle(metricsList); + traceEvent->detail("MinReadLoad", reply.minReadLoad).detail("MaxReadLoad", reply.maxReadLoad); int chosenIdx = -1; for (int i = 0; i < metricsList.size(); ++i) { diff --git a/fdbserver/DataDistributionTracker.actor.cpp b/fdbserver/DataDistributionTracker.actor.cpp index d78bbb8469..d9b5073496 100644 --- a/fdbserver/DataDistributionTracker.actor.cpp +++ b/fdbserver/DataDistributionTracker.actor.cpp @@ -844,7 +844,8 @@ ACTOR Future fetchTopKShardMetrics_impl(DataDistributionTracker* self, Get loop { onChange = Future(); returnMetrics.clear(); - + state int64_t minReadLoad = std::numeric_limits::max(); + state int64_t maxReadLoad = std::numeric_limits::min(); state int i; for (i = 0; i < SERVER_KNOBS->DD_SHARD_COMPARE_LIMIT && i < req.keys.size(); ++i) { auto range = req.keys[i]; @@ -863,6 +864,9 @@ ACTOR Future fetchTopKShardMetrics_impl(DataDistributionTracker* self, Get break; } + minReadLoad = std::min(metrics.bytesReadPerKSecond, minReadLoad); + maxReadLoad = std::max(metrics.bytesReadPerKSecond, maxReadLoad); + if (metrics.bytesReadPerKSecond > 0 && metrics.bytesReadPerKSecond <= req.maxBytesReadPerKSecond) { metrics.keys = range; returnMetrics.push_back(metrics); @@ -873,14 +877,16 @@ ACTOR Future fetchTopKShardMetrics_impl(DataDistributionTracker* self, Get // FIXME(xwang): Do we need to track slow task here? if (!onChange.isValid()) { if (req.topK >= returnMetrics.size()) - req.reply.send(returnMetrics); + req.reply.send(GetTopKMetricsReply(returnMetrics, minReadLoad, maxReadLoad)); else { std::nth_element(returnMetrics.begin(), returnMetrics.begin() + req.topK - 1, returnMetrics.end(), req.comparator); - req.reply.send( - std::vector(returnMetrics.begin(), returnMetrics.begin() + req.topK)); + req.reply.send(GetTopKMetricsReply( + std::vector(returnMetrics.begin(), returnMetrics.begin() + req.topK), + minReadLoad, + maxReadLoad)); } return Void(); } @@ -898,7 +904,7 @@ ACTOR Future fetchTopKShardMetrics(DataDistributionTracker* self, GetTopKM when(wait(fetchTopKShardMetrics_impl(self, req))) {} when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) { TEST(true); // TopK DD_SHARD_METRICS_TIMEOUT - req.reply.send(std::vector(1)); + req.reply.send(GetTopKMetricsReply()); } } return Void();