solve review comments
This commit is contained in:
parent
7865ead54d
commit
cd13356964
|
@ -1795,17 +1795,12 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self,
|
||||||
deterministicRandom()->randomShuffle(metricsList);
|
deterministicRandom()->randomShuffle(metricsList);
|
||||||
traceEvent->detail("MinReadLoad", reply.minReadLoad).detail("MaxReadLoad", reply.maxReadLoad);
|
traceEvent->detail("MinReadLoad", reply.minReadLoad).detail("MaxReadLoad", reply.maxReadLoad);
|
||||||
|
|
||||||
int chosenIdx = -1;
|
if (metricsList.empty()) {
|
||||||
for (int i = 0; i < metricsList.size(); ++i) {
|
|
||||||
chosenIdx = i;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (chosenIdx == -1) {
|
|
||||||
traceEvent->detail("SkipReason", "NoEligibleShards");
|
traceEvent->detail("SkipReason", "NoEligibleShards");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto& [shard, metrics] = metricsList[chosenIdx];
|
auto& [shard, metrics] = metricsList[0];
|
||||||
traceEvent->detail("ShardReadBandwidth", metrics.bytesReadPerKSecond);
|
traceEvent->detail("ShardReadBandwidth", metrics.bytesReadPerKSecond);
|
||||||
// Verify the shard is still in ShardsAffectedByTeamFailure
|
// Verify the shard is still in ShardsAffectedByTeamFailure
|
||||||
shards = self->shardsAffectedByTeamFailure->getShardsFor(
|
shards = self->shardsAffectedByTeamFailure->getShardsFor(
|
||||||
|
|
|
@ -866,7 +866,7 @@ ACTOR Future<Void> fetchTopKShardMetrics_impl(DataDistributionTracker* self, Get
|
||||||
maxReadLoad = std::max(metrics.bytesReadPerKSecond, maxReadLoad);
|
maxReadLoad = std::max(metrics.bytesReadPerKSecond, maxReadLoad);
|
||||||
if (req.minBytesReadPerKSecond <= metrics.bytesReadPerKSecond &&
|
if (req.minBytesReadPerKSecond <= metrics.bytesReadPerKSecond &&
|
||||||
metrics.bytesReadPerKSecond <= req.maxBytesReadPerKSecond) {
|
metrics.bytesReadPerKSecond <= req.maxBytesReadPerKSecond) {
|
||||||
returnMetrics.push_back({ range, metrics });
|
returnMetrics.emplace_back(range, metrics);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -877,8 +877,10 @@ ACTOR Future<Void> fetchTopKShardMetrics_impl(DataDistributionTracker* self, Get
|
||||||
if (req.topK >= returnMetrics.size())
|
if (req.topK >= returnMetrics.size())
|
||||||
req.reply.send(GetTopKMetricsReply(returnMetrics, minReadLoad, maxReadLoad));
|
req.reply.send(GetTopKMetricsReply(returnMetrics, minReadLoad, maxReadLoad));
|
||||||
else {
|
else {
|
||||||
std::nth_element(
|
std::nth_element(returnMetrics.begin(),
|
||||||
returnMetrics.begin(), returnMetrics.begin() + req.topK - 1, returnMetrics.end(), req.compare);
|
returnMetrics.begin() + req.topK - 1,
|
||||||
|
returnMetrics.end(),
|
||||||
|
GetTopKMetricsRequest::compare);
|
||||||
req.reply.send(GetTopKMetricsReply(std::vector<GetTopKMetricsReply::KeyRangeStorageMetrics>(
|
req.reply.send(GetTopKMetricsReply(std::vector<GetTopKMetricsReply::KeyRangeStorageMetrics>(
|
||||||
returnMetrics.begin(), returnMetrics.begin() + req.topK),
|
returnMetrics.begin(), returnMetrics.begin() + req.topK),
|
||||||
minReadLoad,
|
minReadLoad,
|
||||||
|
|
|
@ -231,6 +231,8 @@ struct GetTopKMetricsReply {
|
||||||
struct KeyRangeStorageMetrics {
|
struct KeyRangeStorageMetrics {
|
||||||
KeyRange range;
|
KeyRange range;
|
||||||
StorageMetrics metrics;
|
StorageMetrics metrics;
|
||||||
|
KeyRangeStorageMetrics() = default;
|
||||||
|
KeyRangeStorageMetrics(const KeyRange& range, const StorageMetrics& s) : range(range), metrics(s) {}
|
||||||
};
|
};
|
||||||
std::vector<KeyRangeStorageMetrics> shardMetrics;
|
std::vector<KeyRangeStorageMetrics> shardMetrics;
|
||||||
double minReadLoad = -1, maxReadLoad = -1;
|
double minReadLoad = -1, maxReadLoad = -1;
|
||||||
|
@ -239,7 +241,7 @@ struct GetTopKMetricsReply {
|
||||||
: shardMetrics(m), minReadLoad(minReadLoad), maxReadLoad(maxReadLoad) {}
|
: shardMetrics(m), minReadLoad(minReadLoad), maxReadLoad(maxReadLoad) {}
|
||||||
};
|
};
|
||||||
struct GetTopKMetricsRequest {
|
struct GetTopKMetricsRequest {
|
||||||
int topK = 1; // default only return the top 1 shard based on the comparator
|
int topK = 1; // default only return the top 1 shard based on the GetTopKMetricsRequest::compare function
|
||||||
std::vector<KeyRange> keys;
|
std::vector<KeyRange> keys;
|
||||||
Promise<GetTopKMetricsReply> reply; // topK storage metrics
|
Promise<GetTopKMetricsReply> reply; // topK storage metrics
|
||||||
double maxBytesReadPerKSecond = 0, minBytesReadPerKSecond = 0; // all returned shards won't exceed this read load
|
double maxBytesReadPerKSecond = 0, minBytesReadPerKSecond = 0; // all returned shards won't exceed this read load
|
||||||
|
|
Loading…
Reference in New Issue