diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index b5083b0a70..0f167940df 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -3840,7 +3840,7 @@ ACTOR Future updateNextWigglingStoragePID(DDTeamCollection* teamCollection } // Iterator over each storage process to do storage wiggle ACTOR Future perpetualStorageWiggleIterator(AsyncTrigger* stopSignal, - AsyncTrigger* finishStorageWiggleSignal, + FutureStream finishStorageWiggleSignal, DDTeamCollection* teamCollection) { state bool isWiggling = false; loop choose { @@ -3851,7 +3851,7 @@ ACTOR Future perpetualStorageWiggleIterator(AsyncTrigger* stopSignal, wait(updateNextWigglingStoragePID(teamCollection, &isWiggling)); } } - when(wait(finishStorageWiggleSignal->onTrigger())) { + when(waitNext(finishStorageWiggleSignal)) { isWiggling = true; wait(updateNextWigglingStoragePID(teamCollection, &isWiggling)); } @@ -3860,15 +3860,16 @@ ACTOR Future perpetualStorageWiggleIterator(AsyncTrigger* stopSignal, return Void(); } -ACTOR Future> watchPerpetualStoragePIDChange(Database cx, Promise pid) { +ACTOR Future, Value>> watchPerpetualStoragePIDChange(Database cx) { state ReadYourWritesTransaction tr(cx); state Future watchFuture; + state Value ret; loop { try { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - Optional> value = wait(tr.get(wigglingStorageServerKey)); + Optional value = wait(tr.get(wigglingStorageServerKey)); if (value.present()) { - pid.send(value.get()); + ret = value.get(); } watchFuture = tr.watch(wigglingStorageServerKey); wait(tr.commit()); @@ -3877,28 +3878,27 @@ ACTOR Future> watchPerpetualStoragePIDChange(Database cx, Promise perpetualStorageWiggler(AsyncTrigger* stopSignal, - AsyncTrigger* finishStorageWiggleSignal, + PromiseStream finishStorageWiggleSignal, DDTeamCollection* self, const DDEnabledState* ddEnabledState) { - state Promise pidPromise; + state Value pid; state Future watchFuture; state Future moveFinishFuture = Never(); state Debouncer pauseWiggle(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY); state AsyncTrigger restart; state Future ddQueueCheck = delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistributionLow); - state Value pid; - wait(store(watchFuture, watchPerpetualStoragePIDChange(self->cx, pidPromise))); + state std::pair, Value> res = wait(watchPerpetualStoragePIDChange(self->cx)); + watchFuture = res.first; + pid = std::move(res.second); loop choose { when(wait(stopSignal->onTrigger())) { break; } - when(wait(watchFuture)) { wait(store(watchFuture, watchPerpetualStoragePIDChange(self->cx, pidPromise))); } - when(wait(store(pid, pidPromise.getFuture()))) { - pidPromise.reset(); + when(wait(watchFuture)) { if (self->healthyTeamCount <= 1) { // pre-check health status pauseWiggle.trigger(); } else { @@ -3908,7 +3908,11 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, .detail("ProcessId", pid) .detail("StorageCount", fv.size()); } - } + + wait(store(res, watchPerpetualStoragePIDChange(self->cx))); + watchFuture = res.first; + pid = std::move(res.second); + } when(wait(restart.onTrigger())) { auto fv = self->excludeStorageWigglingServers(pid); moveFinishFuture = waitForAll(fv); @@ -3919,7 +3923,7 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, when(wait(moveFinishFuture)) { moveFinishFuture = Never(); self->includeStorageWigglingServers(pid); - finishStorageWiggleSignal->trigger(); + finishStorageWiggleSignal.send(Void()); TraceEvent("PerpetualStorageWiggleFinish", self->distributorId).detail("ProcessId", pid); pid = Value(); } @@ -3955,7 +3959,7 @@ ACTOR Future monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio const DDEnabledState* ddEnabledState) { state int speed = 0; state AsyncTrigger stopWiggleSignal; - state AsyncTrigger finishStorageWiggleSignal; + state PromiseStream finishStorageWiggleSignal; state SignalableActorCollection collection; loop { @@ -3974,10 +3978,10 @@ ACTOR Future monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio ASSERT(speed == 1 || speed == 0); if (speed == 1) { collection.add( - perpetualStorageWiggleIterator(&stopWiggleSignal, &finishStorageWiggleSignal, teamCollection)); + perpetualStorageWiggleIterator(&stopWiggleSignal, finishStorageWiggleSignal.getFuture(), teamCollection)); collection.add(perpetualStorageWiggler( - &stopWiggleSignal, &finishStorageWiggleSignal, teamCollection, ddEnabledState)); - finishStorageWiggleSignal.trigger(); + &stopWiggleSignal, finishStorageWiggleSignal, teamCollection, ddEnabledState)); + finishStorageWiggleSignal.send(Void()); TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId); } else { stopWiggleSignal.trigger();