move canQueue, 60s each source server, random select portion of shards
This commit is contained in:
parent
131adec811
commit
af9e5ba885
|
@ -903,6 +903,12 @@ struct DDQueueData {
|
|||
// logRelocation( results, "GotSourceServers" );
|
||||
|
||||
fetchingSourcesQueue.erase(results);
|
||||
|
||||
// when doing read rebalance, to avoid the hottest team is chosen many times within 1 traffic sample period, if there are too many shard in the queue or the last shard appending time is less than 1 min, just discard this relocation request
|
||||
if(results.reason == RelocateReason::REBALANCE_READ && !canQueue(results.src)) {
|
||||
return;
|
||||
}
|
||||
|
||||
queueMap.insert(results.keys, results);
|
||||
for (int i = 0; i < results.src.size(); i++) {
|
||||
queue[results.src[i]].insert(results);
|
||||
|
@ -1075,7 +1081,10 @@ struct DDQueueData {
|
|||
|
||||
bool canQueue(const std::vector<UID>& ids) const {
|
||||
return std::all_of(ids.begin(), ids.end(), [this](const UID& id) {
|
||||
return this->queue.count(id) == 0 || this->queue.at(id).size() < 3 ; // == RELOCATION_PARALLELISM_PER_SOURCE_SERVER + 1
|
||||
if(this->queue.count(id) && this->queue.at(id).size()) {
|
||||
return now() - this->queue.at(id).rbegin()->startTime >= 60.0;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
@ -1512,12 +1521,17 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self,
|
|||
|
||||
state std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor(
|
||||
ShardsAffectedByTeamFailure::Team(sourceTeam->getServerIDs(), primary));
|
||||
if (!shards.size()) {
|
||||
if (shards.size() <= 1) {
|
||||
traceEvent->detail("SkipReason", "NoShardOnSource");
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO: set 1000 as a knob
|
||||
// randomly compare a portion of all shards
|
||||
int shuffleLen = std::min((int)(shards.size() * 0.67), 1000);
|
||||
deterministicRandom()->randomShuffle(shards, shuffleLen);
|
||||
state Future<HealthMetrics> healthMetrics = self->cx->getHealthMetrics(true);
|
||||
state GetMetricsRequest req(shards);
|
||||
state GetMetricsRequest req(std::vector<KeyRange>(shards.begin(), shards.begin() + shuffleLen));
|
||||
req.comparator = [](const StorageMetrics& a, const StorageMetrics& b) {
|
||||
return a.bytesReadPerKSecond / std::max(a.bytes * 1.0, 1.0 * SERVER_KNOBS->MIN_SHARD_BYTES) <
|
||||
b.bytesReadPerKSecond / std::max(b.bytes * 1.0, 1.0 * SERVER_KNOBS->MIN_SHARD_BYTES);
|
||||
|
@ -1748,22 +1762,14 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
|
|||
}
|
||||
// clang-format off
|
||||
wait(getSrcDestTeams(self, teamCollectionIndex, srcReq, destReq, &sourceTeam, &destTeam,ddPriority,&traceEvent));
|
||||
// clang-format on
|
||||
if (sourceTeam.isValid() && destTeam.isValid()) {
|
||||
if (readRebalance) {
|
||||
// check can queue for src server
|
||||
if (self->canQueue(sourceTeam->getServerIDs())) {
|
||||
wait(store(
|
||||
moved,
|
||||
rebalanceReadLoad(
|
||||
self, ddPriority, sourceTeam, destTeam, teamCollectionIndex == 0, &traceEvent)));
|
||||
}
|
||||
wait(store(moved,rebalanceReadLoad(self, ddPriority, sourceTeam, destTeam, teamCollectionIndex == 0, &traceEvent)));
|
||||
} else {
|
||||
wait(store(moved,
|
||||
rebalanceTeams(
|
||||
self, ddPriority, sourceTeam, destTeam, teamCollectionIndex == 0, &traceEvent)));
|
||||
wait(store(moved,rebalanceTeams(self, ddPriority, sourceTeam, destTeam, teamCollectionIndex == 0, &traceEvent)));
|
||||
}
|
||||
}
|
||||
// clang-format on
|
||||
moved ? resetCount = 0 : resetCount++;
|
||||
}
|
||||
|
||||
|
|
|
@ -156,8 +156,9 @@ public:
|
|||
}
|
||||
|
||||
template <class C>
|
||||
void randomShuffle(C& container) {
|
||||
int s = (int)container.size();
|
||||
void randomShuffle(C& container, int shuffleLen = -1) {
|
||||
int s = shuffleLen < 0 ? std::min(shuffleLen, (int)container.size()) : (int)container.size();
|
||||
|
||||
for (int i = 0; i < s; i++) {
|
||||
int j = randomInt(i, s);
|
||||
if (i != j) {
|
||||
|
|
Loading…
Reference in New Issue