fix promise bug

This commit is contained in:
Xiaoxi Wang 2021-05-25 20:25:21 +00:00
parent f11b7ffa5f
commit e9a23840ea
1 changed files with 23 additions and 19 deletions

View File

@ -3840,7 +3840,7 @@ ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection
} }
// Iterator over each storage process to do storage wiggle // Iterator over each storage process to do storage wiggle
ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncTrigger* stopSignal, ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncTrigger* stopSignal,
AsyncTrigger* finishStorageWiggleSignal, FutureStream<Void> finishStorageWiggleSignal,
DDTeamCollection* teamCollection) { DDTeamCollection* teamCollection) {
state bool isWiggling = false; state bool isWiggling = false;
loop choose { loop choose {
@ -3851,7 +3851,7 @@ ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncTrigger* stopSignal,
wait(updateNextWigglingStoragePID(teamCollection, &isWiggling)); wait(updateNextWigglingStoragePID(teamCollection, &isWiggling));
} }
} }
when(wait(finishStorageWiggleSignal->onTrigger())) { when(waitNext(finishStorageWiggleSignal)) {
isWiggling = true; isWiggling = true;
wait(updateNextWigglingStoragePID(teamCollection, &isWiggling)); wait(updateNextWigglingStoragePID(teamCollection, &isWiggling));
} }
@ -3860,15 +3860,16 @@ ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncTrigger* stopSignal,
return Void(); return Void();
} }
ACTOR Future<Future<Void>> watchPerpetualStoragePIDChange(Database cx, Promise<Value> pid) { ACTOR Future<std::pair<Future<Void>, Value>> watchPerpetualStoragePIDChange(Database cx) {
state ReadYourWritesTransaction tr(cx); state ReadYourWritesTransaction tr(cx);
state Future<Void> watchFuture; state Future<Void> watchFuture;
state Value ret;
loop { loop {
try { try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
Optional<Standalone<StringRef>> value = wait(tr.get(wigglingStorageServerKey)); Optional<Value> value = wait(tr.get(wigglingStorageServerKey));
if (value.present()) { if (value.present()) {
pid.send(value.get()); ret = value.get();
} }
watchFuture = tr.watch(wigglingStorageServerKey); watchFuture = tr.watch(wigglingStorageServerKey);
wait(tr.commit()); wait(tr.commit());
@ -3877,28 +3878,27 @@ ACTOR Future<Future<Void>> watchPerpetualStoragePIDChange(Database cx, Promise<V
wait(tr.onError(e)); wait(tr.onError(e));
} }
} }
return watchFuture; return std::make_pair(watchFuture, ret);
} }
// Watch the value of current wiggling storage process and do wiggling works // Watch the value of current wiggling storage process and do wiggling works
ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal, ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
AsyncTrigger* finishStorageWiggleSignal, PromiseStream<Void> finishStorageWiggleSignal,
DDTeamCollection* self, DDTeamCollection* self,
const DDEnabledState* ddEnabledState) { const DDEnabledState* ddEnabledState) {
state Promise<Value> pidPromise; state Value pid;
state Future<Void> watchFuture; state Future<Void> watchFuture;
state Future<Void> moveFinishFuture = Never(); state Future<Void> moveFinishFuture = Never();
state Debouncer pauseWiggle(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY); state Debouncer pauseWiggle(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY);
state AsyncTrigger restart; state AsyncTrigger restart;
state Future<Void> ddQueueCheck = delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistributionLow); state Future<Void> ddQueueCheck = delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistributionLow);
state Value pid; state std::pair<Future<Void>, Value> res = wait(watchPerpetualStoragePIDChange(self->cx));
wait(store(watchFuture, watchPerpetualStoragePIDChange(self->cx, pidPromise))); watchFuture = res.first;
pid = std::move(res.second);
loop choose { loop choose {
when(wait(stopSignal->onTrigger())) { break; } when(wait(stopSignal->onTrigger())) { break; }
when(wait(watchFuture)) { wait(store(watchFuture, watchPerpetualStoragePIDChange(self->cx, pidPromise))); } when(wait(watchFuture)) {
when(wait(store(pid, pidPromise.getFuture()))) {
pidPromise.reset();
if (self->healthyTeamCount <= 1) { // pre-check health status if (self->healthyTeamCount <= 1) { // pre-check health status
pauseWiggle.trigger(); pauseWiggle.trigger();
} else { } else {
@ -3908,7 +3908,11 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
.detail("ProcessId", pid) .detail("ProcessId", pid)
.detail("StorageCount", fv.size()); .detail("StorageCount", fv.size());
} }
}
wait(store(res, watchPerpetualStoragePIDChange(self->cx)));
watchFuture = res.first;
pid = std::move(res.second);
}
when(wait(restart.onTrigger())) { when(wait(restart.onTrigger())) {
auto fv = self->excludeStorageWigglingServers(pid); auto fv = self->excludeStorageWigglingServers(pid);
moveFinishFuture = waitForAll(fv); moveFinishFuture = waitForAll(fv);
@ -3919,7 +3923,7 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
when(wait(moveFinishFuture)) { when(wait(moveFinishFuture)) {
moveFinishFuture = Never(); moveFinishFuture = Never();
self->includeStorageWigglingServers(pid); self->includeStorageWigglingServers(pid);
finishStorageWiggleSignal->trigger(); finishStorageWiggleSignal.send(Void());
TraceEvent("PerpetualStorageWiggleFinish", self->distributorId).detail("ProcessId", pid); TraceEvent("PerpetualStorageWiggleFinish", self->distributorId).detail("ProcessId", pid);
pid = Value(); pid = Value();
} }
@ -3955,7 +3959,7 @@ ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio
const DDEnabledState* ddEnabledState) { const DDEnabledState* ddEnabledState) {
state int speed = 0; state int speed = 0;
state AsyncTrigger stopWiggleSignal; state AsyncTrigger stopWiggleSignal;
state AsyncTrigger finishStorageWiggleSignal; state PromiseStream<Void> finishStorageWiggleSignal;
state SignalableActorCollection collection; state SignalableActorCollection collection;
loop { loop {
@ -3974,10 +3978,10 @@ ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio
ASSERT(speed == 1 || speed == 0); ASSERT(speed == 1 || speed == 0);
if (speed == 1) { if (speed == 1) {
collection.add( collection.add(
perpetualStorageWiggleIterator(&stopWiggleSignal, &finishStorageWiggleSignal, teamCollection)); perpetualStorageWiggleIterator(&stopWiggleSignal, finishStorageWiggleSignal.getFuture(), teamCollection));
collection.add(perpetualStorageWiggler( collection.add(perpetualStorageWiggler(
&stopWiggleSignal, &finishStorageWiggleSignal, teamCollection, ddEnabledState)); &stopWiggleSignal, finishStorageWiggleSignal, teamCollection, ddEnabledState));
finishStorageWiggleSignal.trigger(); finishStorageWiggleSignal.send(Void());
TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId); TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId);
} else { } else {
stopWiggleSignal.trigger(); stopWiggleSignal.trigger();