add canQueue
This commit is contained in:
parent
25e1e75d9e
commit
131adec811
|
@ -1072,6 +1072,12 @@ struct DDQueueData {
|
|||
}
|
||||
return highestPriority;
|
||||
}
|
||||
|
||||
bool canQueue(const std::vector<UID>& ids) const {
|
||||
return std::all_of(ids.begin(), ids.end(), [this](const UID& id) {
|
||||
return this->queue.count(id) == 0 || this->queue.at(id).size() < 3 ; // == RELOCATION_PARALLELISM_PER_SOURCE_SERVER + 1
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// return -1 if a.readload > b.readload, usually for choose dest team with low read load
|
||||
|
@ -1742,14 +1748,22 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
|
|||
}
|
||||
// clang-format off
|
||||
wait(getSrcDestTeams(self, teamCollectionIndex, srcReq, destReq, &sourceTeam, &destTeam,ddPriority,&traceEvent));
|
||||
// clang-format on
|
||||
if (sourceTeam.isValid() && destTeam.isValid()) {
|
||||
if (readRebalance) {
|
||||
wait(store(moved,rebalanceReadLoad(self,ddPriority, sourceTeam, destTeam,teamCollectionIndex == 0,&traceEvent)));
|
||||
// check can queue for src server
|
||||
if (self->canQueue(sourceTeam->getServerIDs())) {
|
||||
wait(store(
|
||||
moved,
|
||||
rebalanceReadLoad(
|
||||
self, ddPriority, sourceTeam, destTeam, teamCollectionIndex == 0, &traceEvent)));
|
||||
}
|
||||
} else {
|
||||
wait(store(moved,rebalanceTeams(self,ddPriority, sourceTeam, destTeam,teamCollectionIndex == 0,&traceEvent)));
|
||||
wait(store(moved,
|
||||
rebalanceTeams(
|
||||
self, ddPriority, sourceTeam, destTeam, teamCollectionIndex == 0, &traceEvent)));
|
||||
}
|
||||
}
|
||||
// clang-format on
|
||||
moved ? resetCount = 0 : resetCount++;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue