diff --git a/fdbserver/DDRelocationQueue.actor.cpp b/fdbserver/DDRelocationQueue.actor.cpp index bfb3f31101..9cac7ebd2e 100644 --- a/fdbserver/DDRelocationQueue.actor.cpp +++ b/fdbserver/DDRelocationQueue.actor.cpp @@ -2261,200 +2261,6 @@ ACTOR Future BgDDLoadRebalance(DDQueue* self, int teamCollectionIndex, Dat } catch (Error& e) { // Log actor_cancelled because it's not legal to suppress an event that's initialized traceEvent.errorUnsuppressed(e); - traceEvent.log(); - throw; - } - - traceEvent.detail("Moved", moved); - traceEvent.log(); - } -} - -ACTOR Future BgDDMountainChopper(DDQueue* self, int teamCollectionIndex) { - state double rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL; - state Transaction tr(self->cx); - state double lastRead = 0; - state bool skipCurrentLoop = false; - loop { - state std::pair>, bool> randomTeam; - state bool moved = false; - state TraceEvent traceEvent("BgDDMountainChopper_Old", self->distributorId); - traceEvent.suppressFor(5.0).detail("PollingInterval", rebalancePollingInterval).detail("Rebalance", "Disk"); - - try { - state Future delayF = delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch); - if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) { - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); - Optional val = wait(tr.get(rebalanceDDIgnoreKey)); - lastRead = now(); - if (!val.present()) { - // reset loop interval - if (skipCurrentLoop) { - rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL; - } - skipCurrentLoop = false; - } else { - // NOTE: check special value "" and "on" might written in old version < 7.2 - if (val.get().size() > 0 && val.get() != "on"_sr) { - int ddIgnore = BinaryReader::fromStringRef(val.get(), Unversioned()); - skipCurrentLoop = (ddIgnore & DDIgnore::REBALANCE_DISK) > 0; - } else { - skipCurrentLoop = true; - } - } - } - - traceEvent.detail("Enabled", !skipCurrentLoop); - - wait(delayF); - if (skipCurrentLoop) { - // set loop interval to avoid busy wait here. - rebalancePollingInterval = - std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL); - tr.reset(); - continue; - } - - traceEvent.detail("QueuedRelocations", - self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM]); - if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] < - SERVER_KNOBS->DD_REBALANCE_PARALLELISM) { - std::pair>, bool> _randomTeam = - wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply( - GetTeamRequest(WantNewServers::True, - WantTrueBest::False, - PreferLowerDiskUtil::True, - TeamMustHaveShards::False)))); - randomTeam = _randomTeam; - traceEvent.detail("DestTeam", - printable(randomTeam.first.map( - [](const Reference& team) { return team->getDesc(); }))); - - if (randomTeam.first.present()) { - std::pair>, bool> loadedTeam = - wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply( - GetTeamRequest(WantNewServers::True, - WantTrueBest::True, - PreferLowerDiskUtil::False, - TeamMustHaveShards::True)))); - - traceEvent.detail( - "SourceTeam", - printable(loadedTeam.first.map( - [](const Reference& team) { return team->getDesc(); }))); - - if (loadedTeam.first.present()) { - bool _moved = wait(rebalanceTeams(self, - DataMovementReason::REBALANCE_OVERUTILIZED_TEAM, - loadedTeam.first.get(), - randomTeam.first.get(), - teamCollectionIndex == 0, - &traceEvent)); - moved = _moved; - } - } - } - - tr.reset(); - } catch (Error& e) { - // Log actor_cancelled because it's not legal to suppress an event that's initialized - traceEvent.errorUnsuppressed(e); - wait(tr.onError(e)); - } - - traceEvent.detail("Moved", moved); - traceEvent.log(); - } -} - -ACTOR Future BgDDValleyFiller(DDQueue* self, int teamCollectionIndex) { - state double rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL; - state double lastRead = 0; - state bool skipCurrentLoop = false; - - loop { - state std::pair>, bool> randomTeam; - state bool moved = false; - state TraceEvent traceEvent("BgDDValleyFiller_Old", self->distributorId); - - wait(delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch)); - - traceEvent.suppressFor(5.0).detail("PollingInterval", rebalancePollingInterval).detail("Rebalance", "Disk"); - - if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) { - Optional val = wait(self->txnProcessor->readRebalanceDDIgnoreKey()); - lastRead = now(); - if (!val.present()) { - // reset loop interval - if (skipCurrentLoop) { - rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL; - } - skipCurrentLoop = false; - } else { - // NOTE: check special value "" and "on" might written in old version < 7.2 - if (val.get().size() > 0 && val.get() != "on"_sr) { - int ddIgnore = BinaryReader::fromStringRef(val.get(), Unversioned()); - skipCurrentLoop = (ddIgnore & DDIgnore::REBALANCE_DISK) > 0; - } else { - skipCurrentLoop = true; - } - } - } - - traceEvent.detail("Enabled", !skipCurrentLoop); - if (skipCurrentLoop) { - // set loop interval to avoid busy wait here. - rebalancePollingInterval = - std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL); - continue; - } - - try { - traceEvent.detail("QueuedRelocations", - self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM]); - if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] < - SERVER_KNOBS->DD_REBALANCE_PARALLELISM) { - std::pair>, bool> _randomTeam = - wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply( - GetTeamRequest(WantNewServers::True, - WantTrueBest::False, - PreferLowerDiskUtil::False, - TeamMustHaveShards::True)))); - randomTeam = _randomTeam; - traceEvent.detail("SourceTeam", - printable(randomTeam.first.map( - [](const Reference& team) { return team->getDesc(); }))); - - if (randomTeam.first.present()) { - std::pair>, bool> unloadedTeam = - wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply( - GetTeamRequest(WantNewServers::True, - WantTrueBest::True, - PreferLowerDiskUtil::True, - TeamMustHaveShards::False)))); - - traceEvent.detail( - "DestTeam", - printable(unloadedTeam.first.map( - [](const Reference& team) { return team->getDesc(); }))); - - if (unloadedTeam.first.present()) { - bool _moved = wait(rebalanceTeams(self, - DataMovementReason::REBALANCE_UNDERUTILIZED_TEAM, - randomTeam.first.get(), - unloadedTeam.first.get(), - teamCollectionIndex == 0, - &traceEvent)); - moved = _moved; - } - } - } - } catch (Error& e) { - // Log actor_cancelled because it's not legal to suppress an event that's initialized - traceEvent.errorUnsuppressed(e); - traceEvent.detail("Moved", moved); - traceEvent.log(); throw; } @@ -2504,15 +2310,12 @@ ACTOR Future dataDistributionQueue(std::shared_ptr dbProc state Future launchQueuedWorkTimeout = Never(); for (int i = 0; i < teamCollections.size(); i++) { - // FIXME: Use BgDDLoadBalance for disk rebalance too after DD simulation test proof. - // ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_OVERUTILIZED_TEAM)); - // ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_UNDERUTILIZED_TEAM)); + ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_OVERUTILIZED_TEAM)); + ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_UNDERUTILIZED_TEAM)); if (SERVER_KNOBS->READ_SAMPLING_ENABLED) { ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_READ_OVERUTIL_TEAM)); ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_READ_UNDERUTIL_TEAM)); } - ddQueueFutures.push_back(BgDDMountainChopper(&self, i)); - ddQueueFutures.push_back(BgDDValleyFiller(&self, i)); } ddQueueFutures.push_back(delayedAsyncVar(self.rawProcessingUnhealthy, processingUnhealthy, 0)); ddQueueFutures.push_back(delayedAsyncVar(self.rawProcessingWiggle, processingWiggle, 0));