topK shard random selection
This commit is contained in:
parent
a0aac83085
commit
04311d001e
|
@ -1473,7 +1473,7 @@ public:
|
|||
wait(delay(SERVER_KNOBS->DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY));
|
||||
state std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor(
|
||||
ShardsAffectedByTeamFailure::Team(team->getServerIDs(), self->primary));
|
||||
state std::vector<Future<StorageMetrics>> sizes;
|
||||
state std::vector<Future<std::vector<StorageMetrics>>> sizes;
|
||||
sizes.reserve(shards.size());
|
||||
|
||||
for (auto const& shard : shards) {
|
||||
|
@ -1488,7 +1488,7 @@ public:
|
|||
|
||||
int64_t bytesLost = 0;
|
||||
for (auto const& size : sizes) {
|
||||
bytesLost += size.get().bytes;
|
||||
bytesLost += size.get()[0].bytes;
|
||||
}
|
||||
|
||||
TraceEvent(SevWarnAlways, "DDZeroServerLeftInTeam", self->distributorId)
|
||||
|
|
|
@ -155,13 +155,14 @@ struct GetMetricsRequest {
|
|||
// whether a < b
|
||||
typedef std::function<bool(const StorageMetrics& a, const StorageMetrics& b)> MetricsComparator;
|
||||
std::vector<KeyRange> keys;
|
||||
Promise<StorageMetrics> reply;
|
||||
int topK = 1; // default only return the top 1 shard based on the comparator
|
||||
Promise<std::vector<StorageMetrics>> reply; // topK storage metrics
|
||||
Optional<MetricsComparator>
|
||||
comparator; // if comparator is assigned, return the largest one in keys, otherwise return the sum of metrics
|
||||
comparator; // if comparator is assigned, return the largest topK in keys, otherwise return the sum of metrics
|
||||
|
||||
GetMetricsRequest() {}
|
||||
GetMetricsRequest(KeyRange const& keys) : keys({ keys }) {}
|
||||
GetMetricsRequest(std::vector<KeyRange> const& keys) : keys(keys) {}
|
||||
GetMetricsRequest(KeyRange const& keys, int topK = 1) : keys({ keys }), topK(topK) {}
|
||||
GetMetricsRequest(std::vector<KeyRange> const& keys, int topK = 1) : keys(keys), topK(topK) {}
|
||||
};
|
||||
|
||||
struct GetMetricsListRequest {
|
||||
|
|
|
@ -1146,8 +1146,9 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
|
|||
self->suppressIntervals = 0;
|
||||
}
|
||||
|
||||
state StorageMetrics metrics =
|
||||
std::vector<StorageMetrics> metricsList =
|
||||
wait(brokenPromiseToNever(self->getShardMetrics.getReply(GetMetricsRequest(rd.keys))));
|
||||
state StorageMetrics metrics = metricsList[0];
|
||||
|
||||
ASSERT(rd.src.size());
|
||||
loop {
|
||||
|
@ -1521,45 +1522,55 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self,
|
|||
return false;
|
||||
}
|
||||
|
||||
// TODO: set 1000 as a knob
|
||||
// randomly compare a portion of all shards
|
||||
int shuffleLen = std::min((int)(shards.size() * 0.67), 1000);
|
||||
deterministicRandom()->randomShuffle(shards, shuffleLen);
|
||||
// TODO: set 100 as a knob
|
||||
// randomly choose topK shards
|
||||
state Future<HealthMetrics> healthMetrics = self->cx->getHealthMetrics(true);
|
||||
state GetMetricsRequest req(std::vector<KeyRange>(shards.begin(), shards.begin() + shuffleLen));
|
||||
state GetMetricsRequest req(shards, 100);
|
||||
req.comparator = [](const StorageMetrics& a, const StorageMetrics& b) {
|
||||
return a.bytesReadPerKSecond / std::max(a.bytes * 1.0, 1.0 * SERVER_KNOBS->MIN_SHARD_BYTES) <
|
||||
b.bytesReadPerKSecond / std::max(b.bytes * 1.0, 1.0 * SERVER_KNOBS->MIN_SHARD_BYTES);
|
||||
};
|
||||
state StorageMetrics metrics = wait(brokenPromiseToNever(self->getShardMetrics.getReply(req)));
|
||||
state std::vector<StorageMetrics> metricsList = wait(brokenPromiseToNever(self->getShardMetrics.getReply(req)));
|
||||
wait(ready(healthMetrics));
|
||||
if (getWorstCpu(healthMetrics.get()) < 25.0) { // 25%
|
||||
traceEvent->detail("SkipReason", "LowReadLoad");
|
||||
return false;
|
||||
}
|
||||
if (metrics.keys.present() && metrics.bytes > 0) {
|
||||
auto srcLoad = sourceTeam->getLoadReadBandwidth(false), destLoad = destTeam->getLoadReadBandwidth();
|
||||
traceEvent->detail("ShardReadBandwidth", metrics.bytesReadPerKSecond)
|
||||
.detail("SrcReadBandwidth", srcLoad)
|
||||
.detail("DestReadBandwidth", destLoad);
|
||||
|
||||
if (srcLoad - destLoad <=
|
||||
5 * std::max(metrics.bytesReadPerKSecond, SERVER_KNOBS->SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS)) {
|
||||
traceEvent->detail("SkipReason", "TeamTooSimilar");
|
||||
return false;
|
||||
int chosenIdx = -1;
|
||||
for (int i = 0; i < SERVER_KNOBS->REBALANCE_MAX_RETRIES; ++i) {
|
||||
int idx = deterministicRandom()->randomInt(0, metricsList.size());
|
||||
if (metricsList[idx].keys.present() && metricsList[i].bytes > 0) {
|
||||
chosenIdx = idx;
|
||||
break;
|
||||
}
|
||||
// Verify the shard is still in ShardsAffectedByTeamFailure
|
||||
shards = self->shardsAffectedByTeamFailure->getShardsFor(
|
||||
ShardsAffectedByTeamFailure::Team(sourceTeam->getServerIDs(), primary));
|
||||
for (int i = 0; i < shards.size(); i++) {
|
||||
if (metrics.keys == shards[i]) {
|
||||
self->output.send(RelocateShard(metrics.keys.get(), priority, RelocateReason::REBALANCE_READ));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (chosenIdx == -1) {
|
||||
traceEvent->detail("SkipReason", "NoEligibleShards");
|
||||
return false;
|
||||
}
|
||||
|
||||
auto& metrics = metricsList[chosenIdx];
|
||||
auto srcLoad = sourceTeam->getLoadReadBandwidth(false), destLoad = destTeam->getLoadReadBandwidth();
|
||||
traceEvent->detail("ShardReadBandwidth", metrics.bytesReadPerKSecond)
|
||||
.detail("SrcReadBandwidth", srcLoad)
|
||||
.detail("DestReadBandwidth", destLoad);
|
||||
|
||||
if (srcLoad - destLoad <=
|
||||
5 * std::max(metrics.bytesReadPerKSecond, SERVER_KNOBS->SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS)) {
|
||||
traceEvent->detail("SkipReason", "TeamTooSimilar");
|
||||
return false;
|
||||
}
|
||||
// Verify the shard is still in ShardsAffectedByTeamFailure
|
||||
shards = self->shardsAffectedByTeamFailure->getShardsFor(
|
||||
ShardsAffectedByTeamFailure::Team(sourceTeam->getServerIDs(), primary));
|
||||
for (int i = 0; i < shards.size(); i++) {
|
||||
if (metrics.keys == shards[i]) {
|
||||
self->output.send(RelocateShard(metrics.keys.get(), priority, RelocateReason::REBALANCE_READ));
|
||||
return true;
|
||||
}
|
||||
traceEvent->detail("SkipReason", "ShardNotPresent");
|
||||
} else
|
||||
traceEvent->detail("SkipReason", metrics.keys.present() ? "ShardZeroSize" : "ShardNoKeys");
|
||||
}
|
||||
traceEvent->detail("SkipReason", "ShardNotPresent");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1594,11 +1605,11 @@ ACTOR static Future<bool> rebalanceTeams(DDQueueData* self,
|
|||
state int retries = 0;
|
||||
while (retries < SERVER_KNOBS->REBALANCE_MAX_RETRIES) {
|
||||
state KeyRange testShard = deterministicRandom()->randomChoice(shards);
|
||||
StorageMetrics testMetrics =
|
||||
std::vector<StorageMetrics> testMetrics =
|
||||
wait(brokenPromiseToNever(self->getShardMetrics.getReply(GetMetricsRequest(testShard))));
|
||||
if (testMetrics.bytes > metrics.bytes) {
|
||||
if (testMetrics[0].bytes > metrics.bytes) {
|
||||
moveShard = testShard;
|
||||
metrics = testMetrics;
|
||||
metrics = testMetrics[0];
|
||||
if (metrics.bytes > averageShardBytes) {
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -836,7 +836,9 @@ ACTOR Future<Void> fetchShardMetrics_impl(DataDistributionTracker* self, GetMetr
|
|||
try {
|
||||
loop {
|
||||
Future<Void> onChange;
|
||||
StorageMetrics returnMetrics;
|
||||
std::vector<StorageMetrics> returnMetrics;
|
||||
if (!req.comparator.present())
|
||||
returnMetrics.push_back(StorageMetrics());
|
||||
for (auto range : req.keys) {
|
||||
StorageMetrics metrics;
|
||||
for (auto t : self->shards.intersectingRanges(range)) {
|
||||
|
@ -854,20 +856,25 @@ ACTOR Future<Void> fetchShardMetrics_impl(DataDistributionTracker* self, GetMetr
|
|||
}
|
||||
|
||||
if (req.comparator.present()) {
|
||||
if (req.comparator.get()(returnMetrics, metrics)) {
|
||||
returnMetrics = metrics;
|
||||
returnMetrics.keys = range;
|
||||
}
|
||||
returnMetrics.push_back(metrics);
|
||||
} else {
|
||||
returnMetrics += metrics;
|
||||
returnMetrics[0] += metrics;
|
||||
}
|
||||
}
|
||||
|
||||
if (!onChange.isValid()) {
|
||||
req.reply.send(returnMetrics);
|
||||
if (req.topK >= returnMetrics.size())
|
||||
req.reply.send(returnMetrics);
|
||||
else if (req.comparator.present()) {
|
||||
std::nth_element(returnMetrics.begin(),
|
||||
returnMetrics.end() - req.topK,
|
||||
returnMetrics.end(),
|
||||
req.comparator.get());
|
||||
req.reply.send(
|
||||
std::vector<StorageMetrics>(returnMetrics.rbegin(), returnMetrics.rbegin() + req.topK));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
wait(onChange);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
|
@ -884,7 +891,7 @@ ACTOR Future<Void> fetchShardMetrics(DataDistributionTracker* self, GetMetricsRe
|
|||
TEST(true); // DD_SHARD_METRICS_TIMEOUT
|
||||
StorageMetrics largeMetrics;
|
||||
largeMetrics.bytes = getMaxShardSize(self->dbSizeEstimate->get());
|
||||
req.reply.send(largeMetrics);
|
||||
req.reply.send(std::vector<StorageMetrics>(1, largeMetrics));
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
|
@ -931,7 +938,9 @@ ACTOR Future<Void> fetchShardMetricsList_impl(DataDistributionTracker* self, Get
|
|||
ACTOR Future<Void> fetchShardMetricsList(DataDistributionTracker* self, GetMetricsListRequest req) {
|
||||
choose {
|
||||
when(wait(fetchShardMetricsList_impl(self, req))) {}
|
||||
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) { req.reply.sendError(timed_out()); }
|
||||
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) {
|
||||
req.reply.sendError(timed_out());
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue