remove polling interval; uncomment suppressFor
This commit is contained in:
parent
909a7a92a1
commit
0579b577ba
|
@ -1699,7 +1699,6 @@ ACTOR Future<Void> getSrcDestTeams(DDQueueData* self,
|
|||
}
|
||||
|
||||
ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex, int ddPriority) {
|
||||
state double rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
|
||||
state int resetCount = SERVER_KNOBS->DD_REBALANCE_RESET_AMOUNT;
|
||||
state Transaction tr(self->cx);
|
||||
state double lastRead = 0;
|
||||
|
@ -1716,27 +1715,19 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
|
|||
state GetTeamRequest srcReq;
|
||||
state GetTeamRequest destReq;
|
||||
state TraceEvent traceEvent(eventName, self->distributorId);
|
||||
// FIXME: uncomment
|
||||
traceEvent // .suppressFor(5.0)
|
||||
.detail("PollingInterval", rebalancePollingInterval)
|
||||
traceEvent.suppressFor(5.0)
|
||||
.detail("PollingInterval", SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL)
|
||||
.detail("Rebalance", readRebalance ? "Read" : "Disk");
|
||||
|
||||
if (*self->lastLimited > 0) {
|
||||
traceEvent.detail("SecondsSinceLastLimited", now() - *self->lastLimited);
|
||||
}
|
||||
|
||||
try {
|
||||
delayF = delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch);
|
||||
// NOTE: the DD throttling relies on DDQueue
|
||||
delayF = delay(SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL, TaskPriority::DataDistributionLaunch);
|
||||
if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) {
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
Optional<Value> val = wait(tr.get(rebalanceDDIgnoreKey));
|
||||
lastRead = now();
|
||||
if (!val.present()) {
|
||||
// reset loop interval
|
||||
if (skipCurrentLoop) {
|
||||
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
|
||||
}
|
||||
skipCurrentLoop = false;
|
||||
} else {
|
||||
if (val.get().size() > 0) {
|
||||
|
@ -1756,9 +1747,6 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
|
|||
|
||||
wait(delayF);
|
||||
if (skipCurrentLoop) {
|
||||
// set loop interval to avoid busy wait here.
|
||||
rebalancePollingInterval =
|
||||
std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL);
|
||||
tr.reset();
|
||||
continue;
|
||||
}
|
||||
|
@ -1807,24 +1795,6 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
|
|||
moved ? resetCount = 0 : resetCount++;
|
||||
}
|
||||
|
||||
// NOTE: We don’t want read rebalancing to be slowed down when Ratekeeper kicks in
|
||||
// TODO: consider dynamic polling interval for read rebalance ?
|
||||
if (isDiskRebalancePriority(ddPriority)) {
|
||||
if (now() - (*self->lastLimited) < SERVER_KNOBS->BG_DD_SATURATION_DELAY) {
|
||||
rebalancePollingInterval = std::min(SERVER_KNOBS->BG_DD_MAX_WAIT,
|
||||
rebalancePollingInterval * SERVER_KNOBS->BG_DD_INCREASE_RATE);
|
||||
} else {
|
||||
rebalancePollingInterval = std::max(SERVER_KNOBS->BG_DD_MIN_WAIT,
|
||||
rebalancePollingInterval / SERVER_KNOBS->BG_DD_DECREASE_RATE);
|
||||
}
|
||||
}
|
||||
|
||||
if (resetCount >= SERVER_KNOBS->DD_REBALANCE_RESET_AMOUNT &&
|
||||
rebalancePollingInterval < SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL) {
|
||||
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
|
||||
resetCount = SERVER_KNOBS->DD_REBALANCE_RESET_AMOUNT;
|
||||
}
|
||||
|
||||
traceEvent.detail("ResetCount", resetCount);
|
||||
tr.reset();
|
||||
} catch (Error& e) {
|
||||
|
@ -1848,10 +1818,7 @@ ACTOR Future<Void> BgDDMountainChopper(DDQueueData* self, int teamCollectionInde
|
|||
state std::pair<Optional<Reference<IDataDistributionTeam>>, bool> randomTeam;
|
||||
state bool moved = false;
|
||||
state TraceEvent traceEvent("BgDDMountainChopper_Old", self->distributorId);
|
||||
// FIXME: uncomment
|
||||
traceEvent // .suppressFor(5.0)
|
||||
.detail("PollingInterval", rebalancePollingInterval)
|
||||
.detail("Rebalance", "Disk");
|
||||
traceEvent.suppressFor(5.0).detail("PollingInterval", rebalancePollingInterval).detail("Rebalance", "Disk");
|
||||
|
||||
if (*self->lastLimited > 0) {
|
||||
traceEvent.detail("SecondsSinceLastLimited", now() - *self->lastLimited);
|
||||
|
@ -1974,10 +1941,7 @@ ACTOR Future<Void> BgDDValleyFiller(DDQueueData* self, int teamCollectionIndex)
|
|||
state std::pair<Optional<Reference<IDataDistributionTeam>>, bool> randomTeam;
|
||||
state bool moved = false;
|
||||
state TraceEvent traceEvent("BgDDValleyFiller_Old", self->distributorId);
|
||||
// FIXME: uncomment
|
||||
traceEvent //.suppressFor(5.0)
|
||||
.detail("PollingInterval", rebalancePollingInterval)
|
||||
.detail("Rebalance", "Disk");
|
||||
traceEvent.suppressFor(5.0).detail("PollingInterval", rebalancePollingInterval).detail("Rebalance", "Disk");
|
||||
|
||||
if (*self->lastLimited > 0) {
|
||||
traceEvent.detail("SecondsSinceLastLimited", now() - *self->lastLimited);
|
||||
|
|
Loading…
Reference in New Issue