replace BgDDMountainChopper and BgDDValleyFiller with BgDDLoadRebalance

This commit is contained in:
Xiaoxi Wang 2022-09-21 15:11:04 -07:00
parent 9358aea097
commit 5500ec8126
1 changed files with 2 additions and 199 deletions

View File

@ -2261,200 +2261,6 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueue* self, int teamCollectionIndex, Dat
} catch (Error& e) {
// Log actor_cancelled because it's not legal to suppress an event that's initialized
traceEvent.errorUnsuppressed(e);
traceEvent.log();
throw;
}
traceEvent.detail("Moved", moved);
traceEvent.log();
}
}
ACTOR Future<Void> BgDDMountainChopper(DDQueue* self, int teamCollectionIndex) {
state double rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
state Transaction tr(self->cx);
state double lastRead = 0;
state bool skipCurrentLoop = false;
loop {
state std::pair<Optional<Reference<IDataDistributionTeam>>, bool> randomTeam;
state bool moved = false;
state TraceEvent traceEvent("BgDDMountainChopper_Old", self->distributorId);
traceEvent.suppressFor(5.0).detail("PollingInterval", rebalancePollingInterval).detail("Rebalance", "Disk");
try {
state Future<Void> delayF = delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch);
if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> val = wait(tr.get(rebalanceDDIgnoreKey));
lastRead = now();
if (!val.present()) {
// reset loop interval
if (skipCurrentLoop) {
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
}
skipCurrentLoop = false;
} else {
// NOTE: check special value "" and "on" might written in old version < 7.2
if (val.get().size() > 0 && val.get() != "on"_sr) {
int ddIgnore = BinaryReader::fromStringRef<uint8_t>(val.get(), Unversioned());
skipCurrentLoop = (ddIgnore & DDIgnore::REBALANCE_DISK) > 0;
} else {
skipCurrentLoop = true;
}
}
}
traceEvent.detail("Enabled", !skipCurrentLoop);
wait(delayF);
if (skipCurrentLoop) {
// set loop interval to avoid busy wait here.
rebalancePollingInterval =
std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL);
tr.reset();
continue;
}
traceEvent.detail("QueuedRelocations",
self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM]);
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] <
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> _randomTeam =
wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply(
GetTeamRequest(WantNewServers::True,
WantTrueBest::False,
PreferLowerDiskUtil::True,
TeamMustHaveShards::False))));
randomTeam = _randomTeam;
traceEvent.detail("DestTeam",
printable(randomTeam.first.map<std::string>(
[](const Reference<IDataDistributionTeam>& team) { return team->getDesc(); })));
if (randomTeam.first.present()) {
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> loadedTeam =
wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply(
GetTeamRequest(WantNewServers::True,
WantTrueBest::True,
PreferLowerDiskUtil::False,
TeamMustHaveShards::True))));
traceEvent.detail(
"SourceTeam",
printable(loadedTeam.first.map<std::string>(
[](const Reference<IDataDistributionTeam>& team) { return team->getDesc(); })));
if (loadedTeam.first.present()) {
bool _moved = wait(rebalanceTeams(self,
DataMovementReason::REBALANCE_OVERUTILIZED_TEAM,
loadedTeam.first.get(),
randomTeam.first.get(),
teamCollectionIndex == 0,
&traceEvent));
moved = _moved;
}
}
}
tr.reset();
} catch (Error& e) {
// Log actor_cancelled because it's not legal to suppress an event that's initialized
traceEvent.errorUnsuppressed(e);
wait(tr.onError(e));
}
traceEvent.detail("Moved", moved);
traceEvent.log();
}
}
ACTOR Future<Void> BgDDValleyFiller(DDQueue* self, int teamCollectionIndex) {
state double rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
state double lastRead = 0;
state bool skipCurrentLoop = false;
loop {
state std::pair<Optional<Reference<IDataDistributionTeam>>, bool> randomTeam;
state bool moved = false;
state TraceEvent traceEvent("BgDDValleyFiller_Old", self->distributorId);
wait(delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch));
traceEvent.suppressFor(5.0).detail("PollingInterval", rebalancePollingInterval).detail("Rebalance", "Disk");
if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) {
Optional<Value> val = wait(self->txnProcessor->readRebalanceDDIgnoreKey());
lastRead = now();
if (!val.present()) {
// reset loop interval
if (skipCurrentLoop) {
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
}
skipCurrentLoop = false;
} else {
// NOTE: check special value "" and "on" might written in old version < 7.2
if (val.get().size() > 0 && val.get() != "on"_sr) {
int ddIgnore = BinaryReader::fromStringRef<uint8_t>(val.get(), Unversioned());
skipCurrentLoop = (ddIgnore & DDIgnore::REBALANCE_DISK) > 0;
} else {
skipCurrentLoop = true;
}
}
}
traceEvent.detail("Enabled", !skipCurrentLoop);
if (skipCurrentLoop) {
// set loop interval to avoid busy wait here.
rebalancePollingInterval =
std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL);
continue;
}
try {
traceEvent.detail("QueuedRelocations",
self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM]);
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] <
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> _randomTeam =
wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply(
GetTeamRequest(WantNewServers::True,
WantTrueBest::False,
PreferLowerDiskUtil::False,
TeamMustHaveShards::True))));
randomTeam = _randomTeam;
traceEvent.detail("SourceTeam",
printable(randomTeam.first.map<std::string>(
[](const Reference<IDataDistributionTeam>& team) { return team->getDesc(); })));
if (randomTeam.first.present()) {
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> unloadedTeam =
wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply(
GetTeamRequest(WantNewServers::True,
WantTrueBest::True,
PreferLowerDiskUtil::True,
TeamMustHaveShards::False))));
traceEvent.detail(
"DestTeam",
printable(unloadedTeam.first.map<std::string>(
[](const Reference<IDataDistributionTeam>& team) { return team->getDesc(); })));
if (unloadedTeam.first.present()) {
bool _moved = wait(rebalanceTeams(self,
DataMovementReason::REBALANCE_UNDERUTILIZED_TEAM,
randomTeam.first.get(),
unloadedTeam.first.get(),
teamCollectionIndex == 0,
&traceEvent));
moved = _moved;
}
}
}
} catch (Error& e) {
// Log actor_cancelled because it's not legal to suppress an event that's initialized
traceEvent.errorUnsuppressed(e);
traceEvent.detail("Moved", moved);
traceEvent.log();
throw;
}
@ -2504,15 +2310,12 @@ ACTOR Future<Void> dataDistributionQueue(std::shared_ptr<IDDTxnProcessor> dbProc
state Future<Void> launchQueuedWorkTimeout = Never();
for (int i = 0; i < teamCollections.size(); i++) {
// FIXME: Use BgDDLoadBalance for disk rebalance too after DD simulation test proof.
// ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_OVERUTILIZED_TEAM));
// ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_UNDERUTILIZED_TEAM));
ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_OVERUTILIZED_TEAM));
ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_UNDERUTILIZED_TEAM));
if (SERVER_KNOBS->READ_SAMPLING_ENABLED) {
ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_READ_OVERUTIL_TEAM));
ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_READ_UNDERUTIL_TEAM));
}
ddQueueFutures.push_back(BgDDMountainChopper(&self, i));
ddQueueFutures.push_back(BgDDValleyFiller(&self, i));
}
ddQueueFutures.push_back(delayedAsyncVar(self.rawProcessingUnhealthy, processingUnhealthy, 0));
ddQueueFutures.push_back(delayedAsyncVar(self.rawProcessingWiggle, processingWiggle, 0));