fix wiggler logic bug

This commit is contained in:
Xiaoxi Wang 2021-05-26 17:06:35 +00:00
parent 51b402fa04
commit ce308edc5e
1 changed files with 81 additions and 53 deletions

View File

@ -3842,7 +3842,10 @@ ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection
ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncTrigger* stopSignal,
FutureStream<Void> finishStorageWiggleSignal,
DDTeamCollection* teamCollection) {
state bool isWiggling = false;
state bool isWiggling = true;
// initialize PID
wait(updateNextWigglingStoragePID(teamCollection, &isWiggling));
loop choose {
when(wait(stopSignal->onTrigger())) { break; }
when(wait(teamCollection->canStartStorageWiggling.onTrigger())) {
@ -3860,7 +3863,7 @@ ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncTrigger* stopSignal,
return Void();
}
ACTOR Future<std::pair<Future<Void>, Value>> watchPerpetualStoragePIDChange(Database cx) {
ACTOR Future<std::pair<Future<Void>, Value>> watchPerpetualStoragePIDChange(Database cx) {
state ReadYourWritesTransaction tr(cx);
state Future<Void> watchFuture;
state Value ret;
@ -3890,64 +3893,90 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
state Future<Void> moveFinishFuture = Never();
state Debouncer pauseWiggle(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY);
state AsyncTrigger restart;
state Future<Void> ddQueueCheck = delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistributionLow);
state Future<Void> ddQueueCheck = delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY, TaskPriority::DataDistributionLow);
state int movingCount = 0;
state bool isPaused = false;
state std::pair<Future<Void>, Value> res = wait(watchPerpetualStoragePIDChange(self->cx));
watchFuture = res.first;
pid = std::move(res.second);
state std::pair<Future<Void>, Value> res = wait(watchPerpetualStoragePIDChange(self->cx));
watchFuture = res.first;
pid = std::move(res.second);
loop choose {
when(wait(stopSignal->onTrigger())) { break; }
when(wait(watchFuture)) {
if (self->healthyTeamCount <= 1) { // pre-check health status
pauseWiggle.trigger();
} else {
// start with the initial pid
if (self->healthyTeamCount > 1) { // pre-check health status
auto fv = self->excludeStorageWigglingServers(pid);
movingCount = fv.size();
moveFinishFuture = waitForAll(fv);
TraceEvent("PerpetualStorageWiggleInitialStart", self->distributorId)
.detail("ProcessId", pid)
.detail("StorageCount", movingCount);
} else {
isPaused = true;
TraceEvent("PerpetualStorageWiggleInitialPause", self->distributorId).detail("ProcessId", pid);
}
loop {
choose {
when(wait(stopSignal->onTrigger())) { break; }
when(wait(watchFuture)) {
// read new pid and set the next watch Future
wait(store(res, watchPerpetualStoragePIDChange(self->cx)));
watchFuture = res.first;
pid = std::move(res.second);
if (self->healthyTeamCount <= 1) { // pre-check health status
pauseWiggle.trigger();
}
else {
auto fv = self->excludeStorageWigglingServers(pid);
movingCount = fv.size();
moveFinishFuture = waitForAll(fv);
TraceEvent("PerpetualStorageWiggleStart", self->distributorId)
.detail("ProcessId", pid)
.detail("StorageCount", movingCount);
}
}
when(wait(restart.onTrigger())) {
auto fv = self->excludeStorageWigglingServers(pid);
moveFinishFuture = waitForAll(fv);
TraceEvent("PerpetualStorageWiggleStart", self->distributorId)
TraceEvent("PerpetualStorageWiggleRestart", self->distributorId)
.detail("ProcessId", pid)
.detail("StorageCount", fv.size());
isPaused = false;
}
when(wait(moveFinishFuture)) {
moveFinishFuture = Never();
self->includeStorageWigglingServers(pid);
TraceEvent("PerpetualStorageWiggleFinish", self->distributorId)
.detail("ProcessId", pid)
.detail("StorageCount", movingCount);
pid = Value();
finishStorageWiggleSignal.send(Void());
}
when(wait(self->zeroHealthyTeams->onChange())) {
if (self->zeroHealthyTeams->get() && !isPaused) {
pauseWiggle.trigger();
}
}
when(wait(ddQueueCheck)) {
Promise<int> countp;
self->getUnhealthyRelocationCount.send(countp);
int count = wait(countp.getFuture());
wait(store(res, watchPerpetualStoragePIDChange(self->cx)));
watchFuture = res.first;
pid = std::move(res.second);
}
when(wait(restart.onTrigger())) {
auto fv = self->excludeStorageWigglingServers(pid);
moveFinishFuture = waitForAll(fv);
TraceEvent("PerpetualStorageWiggleRestart", self->distributorId)
.detail("ProcessId", pid)
.detail("StorageCount", fv.size());
}
when(wait(moveFinishFuture)) {
moveFinishFuture = Never();
self->includeStorageWigglingServers(pid);
finishStorageWiggleSignal.send(Void());
TraceEvent("PerpetualStorageWiggleFinish", self->distributorId).detail("ProcessId", pid);
pid = Value();
}
when(wait(self->zeroHealthyTeams->onChange())) {
if (self->zeroHealthyTeams->get()) {
pauseWiggle.trigger();
if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && !isPaused) {
pauseWiggle.trigger();
} else if (count < SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && self->healthyTeamCount > 1) {
restart.trigger();
}
ddQueueCheck = delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY, TaskPriority::DataDistributionLow);
}
}
when(wait(ddQueueCheck)) {
Promise<int> countp;
self->getUnhealthyRelocationCount.send(countp);
int count = wait(countp.getFuture());
if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD) {
pauseWiggle.trigger();
} else if (count < SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && self->healthyTeamCount > 1) {
restart.trigger();
when(wait(pauseWiggle.onTrigger())) {
isPaused = true;
moveFinishFuture = Never();
self->includeStorageWigglingServers(pid);
TraceEvent("PerpetualStorageWigglePause", self->distributorId)
.detail("ProcessId", pid)
.detail("StorageCount", movingCount);
}
ddQueueCheck = delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistributionLow);
}
when(wait(pauseWiggle.onTrigger())) {
moveFinishFuture = Never();
self->includeStorageWigglingServers(pid);
TraceEvent("PerpetualStorageWigglePause", self->distributorId).detail("ProcessId", pid);
}
}
@ -3977,11 +4006,10 @@ ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio
ASSERT(speed == 1 || speed == 0);
if (speed == 1) {
collection.add(
perpetualStorageWiggleIterator(&stopWiggleSignal, finishStorageWiggleSignal.getFuture(), teamCollection));
collection.add(perpetualStorageWiggleIterator(
&stopWiggleSignal, finishStorageWiggleSignal.getFuture(), teamCollection));
collection.add(perpetualStorageWiggler(
&stopWiggleSignal, finishStorageWiggleSignal, teamCollection, ddEnabledState));
finishStorageWiggleSignal.send(Void());
TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId);
} else {
stopWiggleSignal.trigger();