add more statistics

This commit is contained in:
Xiaoxi Wang 2022-05-17 10:19:09 -07:00
parent 3b241955e7
commit f40a2a68fc
3 changed files with 24 additions and 11 deletions

View File

@ -164,13 +164,20 @@ struct GetMetricsRequest {
GetMetricsRequest(KeyRange const& keys) : keys(keys) {} GetMetricsRequest(KeyRange const& keys) : keys(keys) {}
}; };
struct GetTopKMetricsReply {
std::vector<StorageMetrics> metrics;
double minReadLoad = -1, maxReadLoad = -1;
GetTopKMetricsReply() {}
GetTopKMetricsReply(std::vector<StorageMetrics> const& m, double minReadLoad, double maxReadLoad)
: metrics(m), minReadLoad(minReadLoad), maxReadLoad(maxReadLoad) {}
};
struct GetTopKMetricsRequest { struct GetTopKMetricsRequest {
// whether a > b // whether a > b
typedef std::function<bool(const StorageMetrics& a, const StorageMetrics& b)> MetricsComparator; typedef std::function<bool(const StorageMetrics& a, const StorageMetrics& b)> MetricsComparator;
int topK = 1; // default only return the top 1 shard based on the comparator int topK = 1; // default only return the top 1 shard based on the comparator
MetricsComparator comparator; // Return true if a.score > b.score, return the largest topK in keys MetricsComparator comparator; // Return true if a.score > b.score, return the largest topK in keys
std::vector<KeyRange> keys; std::vector<KeyRange> keys;
Promise<std::vector<StorageMetrics>> reply; // topK storage metrics Promise<GetTopKMetricsReply> reply; // topK storage metrics
double maxBytesReadPerKSecond = 0; // all returned shards won't exceed this read load double maxBytesReadPerKSecond = 0; // all returned shards won't exceed this read load
GetTopKMetricsRequest() {} GetTopKMetricsRequest() {}

View File

@ -1554,7 +1554,7 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self,
return a.bytesReadPerKSecond / std::max(a.bytes * 1.0, 1.0 * SERVER_KNOBS->MIN_SHARD_BYTES) > return a.bytesReadPerKSecond / std::max(a.bytes * 1.0, 1.0 * SERVER_KNOBS->MIN_SHARD_BYTES) >
b.bytesReadPerKSecond / std::max(b.bytes * 1.0, 1.0 * SERVER_KNOBS->MIN_SHARD_BYTES); b.bytesReadPerKSecond / std::max(b.bytes * 1.0, 1.0 * SERVER_KNOBS->MIN_SHARD_BYTES);
}; };
state std::vector<StorageMetrics> metricsList = wait(brokenPromiseToNever(self->getTopKMetrics.getReply(req))); state GetTopKMetricsReply reply = wait(brokenPromiseToNever(self->getTopKMetrics.getReply(req)));
wait(ready(healthMetrics)); wait(ready(healthMetrics));
auto cpu = getWorstCpu(healthMetrics.get(), sourceTeam->getServerIDs()); auto cpu = getWorstCpu(healthMetrics.get(), sourceTeam->getServerIDs());
if (cpu < SERVER_KNOBS->READ_REBALANCE_CPU_THRESHOLD) { // 15.0 +- (0.3 * 15) < 20.0 if (cpu < SERVER_KNOBS->READ_REBALANCE_CPU_THRESHOLD) { // 15.0 +- (0.3 * 15) < 20.0
@ -1562,10 +1562,10 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self,
return false; return false;
} }
if (!metricsList.empty()) { auto& metricsList = reply.metrics;
traceEvent->detail("KthReadLoad1", metricsList[metricsList.size() - 1].bytesReadPerKSecond) // NOTE: randomize is important here since we don't want to always push the same shard into the queue
.detail("KthReadLoad2", metricsList[0].bytesReadPerKSecond); deterministicRandom()->randomShuffle(metricsList);
} traceEvent->detail("MinReadLoad", reply.minReadLoad).detail("MaxReadLoad", reply.maxReadLoad);
int chosenIdx = -1; int chosenIdx = -1;
for (int i = 0; i < metricsList.size(); ++i) { for (int i = 0; i < metricsList.size(); ++i) {

View File

@ -844,7 +844,8 @@ ACTOR Future<Void> fetchTopKShardMetrics_impl(DataDistributionTracker* self, Get
loop { loop {
onChange = Future<Void>(); onChange = Future<Void>();
returnMetrics.clear(); returnMetrics.clear();
state int64_t minReadLoad = std::numeric_limits<int64_t>::max();
state int64_t maxReadLoad = std::numeric_limits<int64_t>::min();
state int i; state int i;
for (i = 0; i < SERVER_KNOBS->DD_SHARD_COMPARE_LIMIT && i < req.keys.size(); ++i) { for (i = 0; i < SERVER_KNOBS->DD_SHARD_COMPARE_LIMIT && i < req.keys.size(); ++i) {
auto range = req.keys[i]; auto range = req.keys[i];
@ -863,6 +864,9 @@ ACTOR Future<Void> fetchTopKShardMetrics_impl(DataDistributionTracker* self, Get
break; break;
} }
minReadLoad = std::min(metrics.bytesReadPerKSecond, minReadLoad);
maxReadLoad = std::max(metrics.bytesReadPerKSecond, maxReadLoad);
if (metrics.bytesReadPerKSecond > 0 && metrics.bytesReadPerKSecond <= req.maxBytesReadPerKSecond) { if (metrics.bytesReadPerKSecond > 0 && metrics.bytesReadPerKSecond <= req.maxBytesReadPerKSecond) {
metrics.keys = range; metrics.keys = range;
returnMetrics.push_back(metrics); returnMetrics.push_back(metrics);
@ -873,14 +877,16 @@ ACTOR Future<Void> fetchTopKShardMetrics_impl(DataDistributionTracker* self, Get
// FIXME(xwang): Do we need to track slow task here? // FIXME(xwang): Do we need to track slow task here?
if (!onChange.isValid()) { if (!onChange.isValid()) {
if (req.topK >= returnMetrics.size()) if (req.topK >= returnMetrics.size())
req.reply.send(returnMetrics); req.reply.send(GetTopKMetricsReply(returnMetrics, minReadLoad, maxReadLoad));
else { else {
std::nth_element(returnMetrics.begin(), std::nth_element(returnMetrics.begin(),
returnMetrics.begin() + req.topK - 1, returnMetrics.begin() + req.topK - 1,
returnMetrics.end(), returnMetrics.end(),
req.comparator); req.comparator);
req.reply.send( req.reply.send(GetTopKMetricsReply(
std::vector<StorageMetrics>(returnMetrics.begin(), returnMetrics.begin() + req.topK)); std::vector<StorageMetrics>(returnMetrics.begin(), returnMetrics.begin() + req.topK),
minReadLoad,
maxReadLoad));
} }
return Void(); return Void();
} }
@ -898,7 +904,7 @@ ACTOR Future<Void> fetchTopKShardMetrics(DataDistributionTracker* self, GetTopKM
when(wait(fetchTopKShardMetrics_impl(self, req))) {} when(wait(fetchTopKShardMetrics_impl(self, req))) {}
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) { when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) {
TEST(true); // TopK DD_SHARD_METRICS_TIMEOUT TEST(true); // TopK DD_SHARD_METRICS_TIMEOUT
req.reply.send(std::vector<StorageMetrics>(1)); req.reply.send(GetTopKMetricsReply());
} }
} }
return Void(); return Void();