set max shard bandwidth
This commit is contained in:
parent
48582380fc
commit
b0a00effa0
|
@ -171,9 +171,13 @@ struct GetTopKMetricsRequest {
|
|||
MetricsComparator comparator; // Return true if a.score > b.score, return the largest topK in keys
|
||||
std::vector<KeyRange> keys;
|
||||
Promise<std::vector<StorageMetrics>> reply; // topK storage metrics
|
||||
double maxBytesReadPerKSecond = 0; // all returned shards won't exceed this read load
|
||||
|
||||
GetTopKMetricsRequest() {}
|
||||
GetTopKMetricsRequest(std::vector<KeyRange> const& keys, int topK = 1) : topK(topK), keys(keys) {}
|
||||
GetTopKMetricsRequest(std::vector<KeyRange> const& keys,
|
||||
int topK = 1,
|
||||
double maxBytesReadPerKSecond = std::numeric_limits<double>::max())
|
||||
: topK(topK), keys(keys), maxBytesReadPerKSecond(maxBytesReadPerKSecond) {}
|
||||
};
|
||||
|
||||
struct GetMetricsListRequest {
|
||||
|
|
|
@ -1536,12 +1536,20 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self,
|
|||
traceEvent->detail("SkipReason", "SourceTeamThrottle");
|
||||
return false;
|
||||
}
|
||||
// check team difference
|
||||
auto srcLoad = sourceTeam->getLoadReadBandwidth(false), destLoad = destTeam->getLoadReadBandwidth();
|
||||
traceEvent->detail("SrcReadBandwidth", srcLoad).detail("DestReadBandwidth", destLoad);
|
||||
|
||||
// read bandwidth difference is less than 30% of src load
|
||||
if (0.7 * srcLoad <= destLoad) {
|
||||
traceEvent->detail("SkipReason", "TeamTooSimilar");
|
||||
return false;
|
||||
}
|
||||
// TODO: set 10 as a knob
|
||||
// randomly choose topK shards
|
||||
int topK = std::min(int(0.1 * shards.size()), 10);
|
||||
state Future<HealthMetrics> healthMetrics = self->cx->getHealthMetrics(true);
|
||||
state GetTopKMetricsRequest req(shards, topK);
|
||||
state GetTopKMetricsRequest req(shards, topK, (srcLoad - destLoad) / 3.0);
|
||||
req.comparator = [](const StorageMetrics& a, const StorageMetrics& b) {
|
||||
return a.bytesReadPerKSecond / std::max(a.bytes * 1.0, 1.0 * SERVER_KNOBS->MIN_SHARD_BYTES) >
|
||||
b.bytesReadPerKSecond / std::max(b.bytes * 1.0, 1.0 * SERVER_KNOBS->MIN_SHARD_BYTES);
|
||||
|
@ -1567,16 +1575,7 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self,
|
|||
}
|
||||
|
||||
auto& metrics = metricsList[chosenIdx];
|
||||
auto srcLoad = sourceTeam->getLoadReadBandwidth(false), destLoad = destTeam->getLoadReadBandwidth();
|
||||
traceEvent->detail("ShardReadBandwidth", metrics.bytesReadPerKSecond)
|
||||
.detail("SrcReadBandwidth", srcLoad)
|
||||
.detail("DestReadBandwidth", destLoad);
|
||||
|
||||
if (srcLoad - destLoad <=
|
||||
5 * std::max(metrics.bytesReadPerKSecond, SERVER_KNOBS->SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS)) {
|
||||
traceEvent->detail("SkipReason", "TeamTooSimilar");
|
||||
return false;
|
||||
}
|
||||
traceEvent->detail("ShardReadBandwidth", metrics.bytesReadPerKSecond);
|
||||
// Verify the shard is still in ShardsAffectedByTeamFailure
|
||||
shards = self->shardsAffectedByTeamFailure->getShardsFor(
|
||||
ShardsAffectedByTeamFailure::Team(sourceTeam->getServerIDs(), primary));
|
||||
|
|
|
@ -857,8 +857,10 @@ ACTOR Future<Void> fetchTopKShardMetrics_impl(DataDistributionTracker* self, Get
|
|||
break;
|
||||
}
|
||||
|
||||
metrics.keys = range;
|
||||
returnMetrics.push_back(metrics);
|
||||
if (metrics.bytesReadPerKSecond <= req.maxBytesReadPerKSecond) {
|
||||
metrics.keys = range;
|
||||
returnMetrics.push_back(metrics);
|
||||
}
|
||||
}
|
||||
|
||||
if (!onChange.isValid()) {
|
||||
|
|
Loading…
Reference in New Issue