code refactor;change stopSignal;
This commit is contained in:
parent
f257d8e994
commit
fdd9c30794
|
@ -3936,13 +3936,18 @@ ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection
|
|||
// Iterate over each storage process to do storage wiggle. After initializing the first Process ID, it waits a signal
|
||||
// from `perpetualStorageWiggler` indicating the wiggling of current process is finished. Then it writes the next
|
||||
// Process ID to a system key: `wigglingStorageServerKey` to show the next process to wiggle.
|
||||
ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncTrigger* stopSignal,
|
||||
ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncVar<bool>* stopSignal,
|
||||
FutureStream<Void> finishStorageWiggleSignal,
|
||||
DDTeamCollection* teamCollection) {
|
||||
loop choose {
|
||||
when(wait(stopSignal->onTrigger())) { break; }
|
||||
loop {
|
||||
choose {
|
||||
when(wait(stopSignal->onChange())) {}
|
||||
when(waitNext(finishStorageWiggleSignal)) { wait(updateNextWigglingStoragePID(teamCollection)); }
|
||||
}
|
||||
if (stopSignal->get()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -3993,7 +3998,7 @@ ACTOR Future<Void> clusterHealthCheckForPerpetualWiggle(DDTeamCollection* self)
|
|||
// Process Id is “pid” into excludeServers which prevent recruiting the wiggling storage servers and let teamTracker
|
||||
// start to move data off the affected teams. The wiggling process of current storage servers will be paused if the
|
||||
// cluster is unhealthy and restarted once the cluster is healthy again.
|
||||
ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
|
||||
ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal,
|
||||
PromiseStream<Void> finishStorageWiggleSignal,
|
||||
DDTeamCollection* self,
|
||||
const DDEnabledState* ddEnabledState) {
|
||||
|
@ -4033,22 +4038,21 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
|
|||
} else {
|
||||
TEST(true); // skip wiggling current process
|
||||
TraceEvent("PerpetualStorageWiggleSkip", self->distributorId).detail("ProcessId", pid.toString());
|
||||
|
||||
self->wigglingPid.reset();
|
||||
watchFuture = res.first;
|
||||
finishStorageWiggleSignal.send(Void());
|
||||
moveFinishFuture = Void();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
choose {
|
||||
when(wait(stopSignal->onTrigger())) { break; }
|
||||
when(wait(watchFuture)) {
|
||||
ASSERT(!self->wigglingPid.present()); // the previous wiggle must be finished
|
||||
watchFuture = Never();
|
||||
// read new pid and set the next watch Future
|
||||
wait(store(res, watchPerpetualStoragePIDChange(self)));
|
||||
self->wigglingPid = Optional<Key>(res.second);
|
||||
|
||||
// random delay
|
||||
wait(delayJittered(5.0, TaskPriority::DataDistributionLow));
|
||||
}
|
||||
when(wait(moveFinishFuture)) {
|
||||
TEST(true); // finish wiggling this process
|
||||
|
@ -4065,8 +4069,11 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
|
|||
watchFuture = res.first;
|
||||
finishStorageWiggleSignal.send(Void());
|
||||
}
|
||||
when(wait(ddQueueCheck)) {}
|
||||
when(wait(self->pauseWiggle->onChange())) {}
|
||||
when(wait(ddQueueCheck || self->pauseWiggle->onChange() || stopSignal->onChange())) {}
|
||||
}
|
||||
|
||||
if (stopSignal->get()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4086,10 +4093,9 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
|
|||
ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollection,
|
||||
const DDEnabledState* ddEnabledState) {
|
||||
state int speed = 0;
|
||||
state AsyncTrigger stopWiggleSignal;
|
||||
state AsyncVar<bool> stopWiggleSignal(false);
|
||||
state PromiseStream<Void> finishStorageWiggleSignal;
|
||||
state SignalableActorCollection collection;
|
||||
state bool started = false;
|
||||
teamCollection->pauseWiggle = makeReference<AsyncVar<bool>>(true);
|
||||
|
||||
loop {
|
||||
|
@ -4106,18 +4112,17 @@ ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio
|
|||
wait(tr.commit());
|
||||
|
||||
ASSERT(speed == 1 || speed == 0);
|
||||
if (speed == 1 && !started) {
|
||||
if (speed == 1 && stopWiggleSignal.get()) { // avoid duplicated start
|
||||
stopWiggleSignal.set(false);
|
||||
collection.add(perpetualStorageWiggleIterator(
|
||||
&stopWiggleSignal, finishStorageWiggleSignal.getFuture(), teamCollection));
|
||||
collection.add(perpetualStorageWiggler(
|
||||
&stopWiggleSignal, finishStorageWiggleSignal, teamCollection, ddEnabledState));
|
||||
TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId);
|
||||
started = true;
|
||||
} else if (speed == 0 && started) {
|
||||
stopWiggleSignal.trigger();
|
||||
} else if (speed == 0 && !stopWiggleSignal.get()) {
|
||||
stopWiggleSignal.set(true);
|
||||
wait(collection.signalAndReset());
|
||||
TraceEvent("PerpetualStorageWiggleClose", teamCollection->distributorId);
|
||||
started = false;
|
||||
teamCollection->pauseWiggle->set(true);
|
||||
}
|
||||
wait(watchFuture);
|
||||
|
|
Loading…
Reference in New Issue