completeDest

This commit is contained in:
Xiaoxi Wang 2022-04-21 21:23:42 -07:00
parent 9f1dceba26
commit f9e57396e2
1 changed files with 16 additions and 6 deletions

View File

@ -386,15 +386,19 @@ void launchDest(RelocateData& relocation,
}
}
void completeDest(RelocateData const& relocation, std::map<UID, Busyness>& destBusymap) {
int destWorkFactor = getDestWorkFactor();
for (UID id : relocation.completeDests) {
destBusymap[id].removeWork(relocation.priority, destWorkFactor);
}
}
void complete(RelocateData const& relocation, std::map<UID, Busyness>& busymap, std::map<UID, Busyness>& destBusymap) {
ASSERT(relocation.workFactor > 0);
for (int i = 0; i < relocation.src.size(); i++)
busymap[relocation.src[i]].removeWork(relocation.priority, relocation.workFactor);
int destWorkFactor = getDestWorkFactor();
for (UID id : relocation.completeDests) {
destBusymap[id].removeWork(relocation.priority, destWorkFactor);
}
completeDest(relocation, destBusymap);
}
ACTOR Future<Void> dataDistributionRelocator(struct DDQueueData* self,
@ -1389,6 +1393,8 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
} else {
TEST(true); // move to removed server
healthyDestinations.addDataInFlightToTeam(-metrics.bytes);
completeDest(rd, self->destBusymap);
rd.completeDests.clear();
wait(delay(SERVER_KNOBS->RETRY_RELOCATESHARD_DELAY, TaskPriority::DataDistributionLaunch));
}
@ -1808,7 +1814,9 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
debug_setCheckRelocationDuration(false);
}
}
when(KeyRange done = waitNext(rangesComplete.getFuture())) { keysToLaunchFrom = done; }
when(KeyRange done = waitNext(rangesComplete.getFuture())) {
keysToLaunchFrom = done;
}
when(wait(recordMetrics)) {
Promise<int64_t> req;
getAverageShardBytes.send(req);
@ -1851,7 +1859,9 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
}
when(wait(self.error.getFuture())) {} // Propagate errors from dataDistributionRelocator
when(wait(waitForAll(balancingFutures))) {}
when(Promise<int> r = waitNext(getUnhealthyRelocationCount)) { r.send(self.unhealthyRelocations); }
when(Promise<int> r = waitNext(getUnhealthyRelocationCount)) {
r.send(self.unhealthyRelocations);
}
}
}
} catch (Error& e) {