Merge branch 'readaware-perf' into readaware
This commit is contained in:
commit
178bab5faf
|
@ -605,7 +605,6 @@ struct StorageMetrics {
|
|||
int64_t bytesPerKSecond = 0; // network bandwidth (average over 10s)
|
||||
int64_t iosPerKSecond = 0;
|
||||
int64_t bytesReadPerKSecond = 0;
|
||||
Optional<KeyRange> keys; // this metric belongs to which range
|
||||
|
||||
static const int64_t infinity = 1LL << 60;
|
||||
|
||||
|
|
|
@ -166,17 +166,18 @@ struct GetMetricsRequest {
|
|||
};
|
||||
|
||||
struct GetTopKMetricsReply {
|
||||
std::vector<StorageMetrics> metrics;
|
||||
struct KeyRangeStorageMetrics {
|
||||
KeyRange range;
|
||||
StorageMetrics metrics;
|
||||
};
|
||||
std::vector<KeyRangeStorageMetrics> shardMetrics;
|
||||
double minReadLoad = -1, maxReadLoad = -1;
|
||||
GetTopKMetricsReply() {}
|
||||
GetTopKMetricsReply(std::vector<StorageMetrics> const& m, double minReadLoad, double maxReadLoad)
|
||||
: metrics(m), minReadLoad(minReadLoad), maxReadLoad(maxReadLoad) {}
|
||||
GetTopKMetricsReply(std::vector<KeyRangeStorageMetrics> const& m, double minReadLoad, double maxReadLoad)
|
||||
: shardMetrics(m), minReadLoad(minReadLoad), maxReadLoad(maxReadLoad) {}
|
||||
};
|
||||
struct GetTopKMetricsRequest {
|
||||
// whether a > b
|
||||
typedef std::function<bool(const StorageMetrics& a, const StorageMetrics& b)> MetricsComparator;
|
||||
int topK = 1; // default only return the top 1 shard based on the comparator
|
||||
MetricsComparator comparator; // Return true if a.score > b.score, return the largest topK in keys
|
||||
std::vector<KeyRange> keys;
|
||||
Promise<GetTopKMetricsReply> reply; // topK storage metrics
|
||||
double maxBytesReadPerKSecond = 0, minBytesReadPerKSecond = 0; // all returned shards won't exceed this read load
|
||||
|
@ -188,6 +189,20 @@ struct GetTopKMetricsRequest {
|
|||
double minBytesReadPerKSecond = 0)
|
||||
: topK(topK), keys(keys), maxBytesReadPerKSecond(maxBytesReadPerKSecond),
|
||||
minBytesReadPerKSecond(minBytesReadPerKSecond) {}
|
||||
|
||||
// Return true if a.score > b.score, return the largest topK in keys
|
||||
static bool compare(const GetTopKMetricsReply::KeyRangeStorageMetrics& a,
|
||||
const GetTopKMetricsReply::KeyRangeStorageMetrics& b) {
|
||||
return compareByReadDensity(a, b);
|
||||
}
|
||||
|
||||
private:
|
||||
// larger read density means higher score
|
||||
static bool compareByReadDensity(const GetTopKMetricsReply::KeyRangeStorageMetrics& a,
|
||||
const GetTopKMetricsReply::KeyRangeStorageMetrics& b) {
|
||||
return a.metrics.bytesReadPerKSecond / std::max(a.metrics.bytes * 1.0, 1.0) >
|
||||
b.metrics.bytesReadPerKSecond / std::max(b.metrics.bytes * 1.0, 1.0);
|
||||
}
|
||||
};
|
||||
|
||||
struct GetMetricsListRequest {
|
||||
|
|
|
@ -1562,10 +1562,6 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self,
|
|||
state Future<HealthMetrics> healthMetrics = self->cx->getHealthMetrics(true);
|
||||
state GetTopKMetricsRequest req(
|
||||
shards, topK, (srcLoad - destLoad) * SERVER_KNOBS->READ_REBALANCE_MAX_SHARD_FRAC, srcLoad / shards.size());
|
||||
req.comparator = [](const StorageMetrics& a, const StorageMetrics& b) {
|
||||
return a.bytesReadPerKSecond / std::max(a.bytes * 1.0, 1.0) >
|
||||
b.bytesReadPerKSecond / std::max(b.bytes * 1.0, 1.0);
|
||||
};
|
||||
state GetTopKMetricsReply reply = wait(brokenPromiseToNever(self->getTopKMetrics.getReply(req)));
|
||||
wait(ready(healthMetrics));
|
||||
auto cpu = getWorstCpu(healthMetrics.get(), sourceTeam->getServerIDs());
|
||||
|
@ -1574,31 +1570,29 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self,
|
|||
return false;
|
||||
}
|
||||
|
||||
auto& metricsList = reply.metrics;
|
||||
auto& metricsList = reply.shardMetrics;
|
||||
// NOTE: randomize is important here since we don't want to always push the same shard into the queue
|
||||
deterministicRandom()->randomShuffle(metricsList);
|
||||
traceEvent->detail("MinReadLoad", reply.minReadLoad).detail("MaxReadLoad", reply.maxReadLoad);
|
||||
|
||||
int chosenIdx = -1;
|
||||
for (int i = 0; i < metricsList.size(); ++i) {
|
||||
if (metricsList[i].keys.present()) {
|
||||
chosenIdx = i;
|
||||
break;
|
||||
}
|
||||
chosenIdx = i;
|
||||
break;
|
||||
}
|
||||
if (chosenIdx == -1) {
|
||||
traceEvent->detail("SkipReason", "NoEligibleShards");
|
||||
return false;
|
||||
}
|
||||
|
||||
auto& metrics = metricsList[chosenIdx];
|
||||
auto& [shard, metrics] = metricsList[chosenIdx];
|
||||
traceEvent->detail("ShardReadBandwidth", metrics.bytesReadPerKSecond);
|
||||
// Verify the shard is still in ShardsAffectedByTeamFailure
|
||||
shards = self->shardsAffectedByTeamFailure->getShardsFor(
|
||||
ShardsAffectedByTeamFailure::Team(sourceTeam->getServerIDs(), primary));
|
||||
for (int i = 0; i < shards.size(); i++) {
|
||||
if (metrics.keys == shards[i]) {
|
||||
self->output.send(RelocateShard(metrics.keys.get(), priority, RelocateReason::REBALANCE_READ));
|
||||
if (shard == shards[i]) {
|
||||
self->output.send(RelocateShard(shard, priority, RelocateReason::REBALANCE_READ));
|
||||
self->updateLastAsSource(sourceTeam->getServerIDs());
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -833,9 +833,8 @@ ACTOR Future<Void> trackInitialShards(DataDistributionTracker* self, Reference<I
|
|||
}
|
||||
|
||||
ACTOR Future<Void> fetchTopKShardMetrics_impl(DataDistributionTracker* self, GetTopKMetricsRequest req) {
|
||||
ASSERT(req.comparator);
|
||||
state Future<Void> onChange;
|
||||
state std::vector<StorageMetrics> returnMetrics;
|
||||
state std::vector<GetTopKMetricsReply::KeyRangeStorageMetrics> returnMetrics;
|
||||
// random pick a portion of shard
|
||||
if (req.keys.size() > SERVER_KNOBS->DD_SHARD_COMPARE_LIMIT) {
|
||||
deterministicRandom()->randomShuffle(req.keys, SERVER_KNOBS->DD_SHARD_COMPARE_LIMIT);
|
||||
|
@ -869,8 +868,7 @@ ACTOR Future<Void> fetchTopKShardMetrics_impl(DataDistributionTracker* self, Get
|
|||
maxReadLoad = std::max(metrics.bytesReadPerKSecond, maxReadLoad);
|
||||
if (req.minBytesReadPerKSecond <= metrics.bytesReadPerKSecond &&
|
||||
metrics.bytesReadPerKSecond <= req.maxBytesReadPerKSecond) {
|
||||
metrics.keys = range;
|
||||
returnMetrics.push_back(metrics);
|
||||
returnMetrics.push_back({ range, metrics });
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -881,14 +879,12 @@ ACTOR Future<Void> fetchTopKShardMetrics_impl(DataDistributionTracker* self, Get
|
|||
if (req.topK >= returnMetrics.size())
|
||||
req.reply.send(GetTopKMetricsReply(returnMetrics, minReadLoad, maxReadLoad));
|
||||
else {
|
||||
std::nth_element(returnMetrics.begin(),
|
||||
returnMetrics.begin() + req.topK - 1,
|
||||
returnMetrics.end(),
|
||||
req.comparator);
|
||||
req.reply.send(GetTopKMetricsReply(
|
||||
std::vector<StorageMetrics>(returnMetrics.begin(), returnMetrics.begin() + req.topK),
|
||||
minReadLoad,
|
||||
maxReadLoad));
|
||||
std::nth_element(
|
||||
returnMetrics.begin(), returnMetrics.begin() + req.topK - 1, returnMetrics.end(), req.compare);
|
||||
req.reply.send(GetTopKMetricsReply(std::vector<GetTopKMetricsReply::KeyRangeStorageMetrics>(
|
||||
returnMetrics.begin(), returnMetrics.begin() + req.topK),
|
||||
minReadLoad,
|
||||
maxReadLoad));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue