remove duplicate code

This commit is contained in:
Xiaoxi Wang 2022-07-29 10:04:14 -07:00
parent cfc13e7018
commit d8323820f1
1 changed files with 19 additions and 29 deletions

View File

@ -1964,33 +1964,19 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
traceEvent.detail("QueuedRelocations", self->priority_relocations[ddPriority]);
if (self->priority_relocations[ddPriority] < SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
if (isDataMovementForMountainChopper(reason)) {
srcReq = GetTeamRequest(WantNewServers::True,
WantTrueBest::True,
PreferLowerDiskUtil::False,
TeamMustHaveShards::True,
ForReadBalance(readRebalance),
PreferLowerReadUtil::False);
destReq = GetTeamRequest(WantNewServers::True,
WantTrueBest::False,
PreferLowerDiskUtil::True,
TeamMustHaveShards::False,
ForReadBalance(readRebalance),
PreferLowerReadUtil::True);
} else {
srcReq = GetTeamRequest(WantNewServers::True,
WantTrueBest::False,
PreferLowerDiskUtil::False,
TeamMustHaveShards::True,
ForReadBalance(readRebalance),
PreferLowerReadUtil::False);
destReq = GetTeamRequest(WantNewServers::True,
WantTrueBest::True,
PreferLowerDiskUtil::True,
TeamMustHaveShards::False,
ForReadBalance(readRebalance),
PreferLowerReadUtil::True);
}
auto mcMove = isDataMovementForMountainChopper(reason);
srcReq = GetTeamRequest(WantNewServers::True,
WantTrueBest(mcMove),
PreferLowerDiskUtil::False,
TeamMustHaveShards::True,
ForReadBalance(readRebalance),
PreferLowerReadUtil::False);
destReq = GetTeamRequest(WantNewServers::True,
WantTrueBest(!mcMove),
PreferLowerDiskUtil::True,
TeamMustHaveShards::False,
ForReadBalance(readRebalance),
PreferLowerReadUtil::True);
state Future<SrcDestTeamPair> getTeamFuture =
getSrcDestTeams(self, teamCollectionIndex, srcReq, destReq, ddPriority, &traceEvent);
wait(ready(getTeamFuture));
@ -2336,7 +2322,9 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
debug_setCheckRelocationDuration(false);
}
}
when(KeyRange done = waitNext(rangesComplete.getFuture())) { keysToLaunchFrom = done; }
when(KeyRange done = waitNext(rangesComplete.getFuture())) {
keysToLaunchFrom = done;
}
when(wait(recordMetrics)) {
Promise<int64_t> req;
getAverageShardBytes.send(req);
@ -2383,7 +2371,9 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
}
when(wait(self.error.getFuture())) {} // Propagate errors from dataDistributionRelocator
when(wait(waitForAll(balancingFutures))) {}
when(Promise<int> r = waitNext(getUnhealthyRelocationCount)) { r.send(self.unhealthyRelocations); }
when(Promise<int> r = waitNext(getUnhealthyRelocationCount)) {
r.send(self.unhealthyRelocations);
}
}
}
} catch (Error& e) {