merge readaware-better

This commit is contained in:
Xiaoxi Wang 2022-04-11 17:09:39 -07:00
commit 6c841cd32b
2 changed files with 122 additions and 5 deletions

View File

@ -66,7 +66,8 @@ ACTOR Future<Void> setDDIgnoreRebalanceSwitch(Reference<IDatabase> db, uint8_t D
loop {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
try {
Optional<Value> v = wait(safeThreadFutureToFuture(tr->get(fdb_cli::ddIgnoreRebalanceSpecialKey)));
state ThreadFuture<Optional<Value>> resultFuture = tr->get(fdb_cli::ddIgnoreRebalanceSpecialKey);
Optional<Value> v = wait(safeThreadFutureToFuture(resultFuture));
uint8_t oldValue = 0; // nothing is disabled
if (v.present()) {
if (v.get().size() > 0) {

View File

@ -1051,7 +1051,7 @@ struct DDQueueData {
// return -1 if a.readload > b.readload
int greaterReadLoad(Reference<IDataDistributionTeam> a, Reference<IDataDistributionTeam> b) {
auto r1 = a->getLoadReadBandwidth(true, 2.0), r2 = b->getLoadReadBandwidth(true, 2.0);
auto r1 = a->getLoadReadBandwidth(true, 2), r2 = b->getLoadReadBandwidth(true, 2);
return r1 == r2 ? 0 : (r1 > r2 ? -1 : 1);
}
@ -1399,7 +1399,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
auto& destinationRef = healthyDestinations;
self->noErrorActors.add(
trigger([destinationRef, readLoad]() mutable { destinationRef.addReadInFlightToTeam(-readLoad); },
delay(SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL, TaskPriority::DataDistribution)));
delay(SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL)));
// onFinished.send( rs );
if (!error.code()) {
@ -1437,7 +1437,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
auto& destinationRef = healthyDestinations;
self->noErrorActors.add(
trigger([destinationRef, readLoad]() mutable { destinationRef.addReadInFlightToTeam(-readLoad); },
delay(SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL, TaskPriority::DataDistribution)));
delay(SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL)));
rd.completeDests.clear();
wait(delay(SERVER_KNOBS->RETRY_RELOCATESHARD_DELAY, TaskPriority::DataDistributionLaunch));
}
@ -1655,7 +1655,7 @@ ACTOR Future<Void> BgDDLoadRebalancer(DDQueueData* self, int teamCollectionIndex
try {
// FIXME: change back to BG_REBALANCE_SWITCH_CHECK_INTERVAL after test
state Future<Void> delayF = delayJittered(0.1, TaskPriority::DataDistributionLaunch);
state Future<Void> delayF = delay(0.1, TaskPriority::DataDistributionLaunch);
if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> val = wait(tr.get(rebalanceDDIgnoreKey));
@ -1746,6 +1746,122 @@ ACTOR Future<Void> BgDDLoadRebalancer(DDQueueData* self, int teamCollectionIndex
}
}
ACTOR Future<Void> BgDDMountainChopper(DDQueueData* self, int teamCollectionIndex) {
state double rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
state int resetCount = SERVER_KNOBS->DD_REBALANCE_RESET_AMOUNT;
state Transaction tr(self->cx);
state double lastRead = 0;
state bool skipCurrentLoop = false;
state bool disableReadBalance = false;
state bool disableDiskBalance = false;
loop {
state bool moved = false;
state Reference<IDataDistributionTeam> sourceTeam;
state Reference<IDataDistributionTeam> destTeam;
state GetTeamRequest srcReq;
state GetTeamRequest destReq;
state TraceEvent traceEvent("BgDDMountainChopper", self->distributorId);
traceEvent.suppressFor(5.0).detail("PollingInterval", rebalancePollingInterval);
if (*self->lastLimited > 0) {
traceEvent.detail("SecondsSinceLastLimited", now() - *self->lastLimited);
}
try {
// FIXME: change back to BG_REBALANCE_SWITCH_CHECK_INTERVAL after test
state Future<Void> delayF = delay(0.1, TaskPriority::DataDistributionLaunch);
if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> val = wait(tr.get(rebalanceDDIgnoreKey));
lastRead = now();
if (!val.present()) {
// reset loop interval
if (skipCurrentLoop) {
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
}
skipCurrentLoop = false;
disableReadBalance = false;
disableDiskBalance = false;
} else {
if (val.get().size() > 0) {
int ddIgnore = BinaryReader::fromStringRef<int>(val.get(), Unversioned());
disableDiskBalance = (ddIgnore & DDIgnore::REBALANCE_DISK) > 0;
disableReadBalance = (ddIgnore & DDIgnore::REBALANCE_READ) > 0;
skipCurrentLoop = disableReadBalance && disableDiskBalance;
} else {
skipCurrentLoop = true;
}
}
}
traceEvent.detail("Enabled",
skipCurrentLoop ? "None" : (disableReadBalance ? "NoReadBalance" : "NoDiskBalance"));
wait(delayF);
if (skipCurrentLoop) {
// set loop interval to avoid busy wait here.
rebalancePollingInterval =
std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL);
continue;
}
traceEvent.detail("QueuedRelocations",
self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM]);
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] <
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
// FIXME: read balance and disk balance shouldn't be mutual exclusive in the future
srcReq = GetTeamRequest(true, true, false, true);
destReq = GetTeamRequest(true, false, true, false);
if (!disableReadBalance) {
srcReq.teamSorter = lessReadLoad;
destReq.teamSorter = greaterReadLoad;
}
// clang-format off
wait(getSrcDestTeams(self, teamCollectionIndex, srcReq, destReq, &sourceTeam, &destTeam,
SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM,&traceEvent));
if (sourceTeam.isValid() && destTeam.isValid()) {
if (!disableReadBalance) {
wait(store(moved,rebalanceReadLoad(self,SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM,
sourceTeam, destTeam,teamCollectionIndex == 0,
&traceEvent)));
} else {
wait(store(moved,rebalanceTeams(self,SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM,
sourceTeam, destTeam,teamCollectionIndex == 0,
&traceEvent)));
}
}
// clang-format on
moved ? resetCount = 0 : resetCount++;
}
if (now() - (*self->lastLimited) < SERVER_KNOBS->BG_DD_SATURATION_DELAY) {
rebalancePollingInterval = std::min(SERVER_KNOBS->BG_DD_MAX_WAIT,
rebalancePollingInterval * SERVER_KNOBS->BG_DD_INCREASE_RATE);
} else {
rebalancePollingInterval = std::max(SERVER_KNOBS->BG_DD_MIN_WAIT,
rebalancePollingInterval / SERVER_KNOBS->BG_DD_DECREASE_RATE);
}
if (resetCount >= SERVER_KNOBS->DD_REBALANCE_RESET_AMOUNT &&
rebalancePollingInterval < SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL) {
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
resetCount = SERVER_KNOBS->DD_REBALANCE_RESET_AMOUNT;
}
traceEvent.detail("ResetCount", resetCount);
tr.reset();
} catch (Error& e) {
// Log actor_cancelled because it's not legal to suppress an event that's initialized
traceEvent.errorUnsuppressed(e);
wait(tr.onError(e));
}
traceEvent.detail("Moved", moved);
traceEvent.log();
}
}
ACTOR Future<Void> BgDDValleyFiller(DDQueueData* self, int teamCollectionIndex) {
state double rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
state int resetCount = SERVER_KNOBS->DD_REBALANCE_RESET_AMOUNT;