add new priority in RelocateData

This commit is contained in:
Xiaoxi Wang 2022-04-12 16:22:17 -07:00
parent 61a1f7683b
commit 5e96bacb5b
1 changed files with 34 additions and 31 deletions

View File

@ -37,6 +37,19 @@
#define WORK_FULL_UTILIZATION 10000 // This is not a knob; it is a fixed point scaling factor!
inline bool isDiskRebalancePriority(int priority) {
return priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM ||
priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM;
}
inline bool isMountainChopperPriority(int priority) {
return priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM ||
priority == SERVER_KNOBS->PRIORITY_REBALANCE_READ_OVERUTIL_TEAM;
}
inline bool isValleyFillerPriority(int priority) {
return priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM ||
priority == SERVER_KNOBS->PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM;
}
struct RelocateData {
KeyRange keys;
int priority;
@ -61,8 +74,7 @@ struct RelocateData {
: keys(rs.keys), priority(rs.priority), boundaryPriority(isBoundaryPriority(rs.priority) ? rs.priority : -1),
healthPriority(isHealthPriority(rs.priority) ? rs.priority : -1), reason(rs.reason), startTime(now()),
randomId(deterministicRandom()->randomUniqueID()), workFactor(0),
wantsNewServers(rs.priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM ||
rs.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM ||
wantsNewServers(isMountainChopperPriority(rs.priority) || isValleyFillerPriority(rs.priority) ||
rs.priority == SERVER_KNOBS->PRIORITY_SPLIT_SHARD ||
rs.priority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT),
cancellable(true), interval("QueuedRelocation") {}
@ -1073,18 +1085,6 @@ static std::string destServersString(std::vector<std::pair<Reference<IDataDistri
return std::move(ss).str();
}
inline bool isDiskRebalancePriority(int priority) {
return priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM ||
priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM;
}
inline bool isMountainChopperPriority(int priority) {
return priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM ||
priority == SERVER_KNOBS->PRIORITY_REBALANCE_READ_OVERUTIL_TEAM;
}
inline bool isValleyFillerPriority(int priority) {
return priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM ||
priority == SERVER_KNOBS->PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM;
}
// This actor relocates the specified keys to a good place.
// The inFlightActor key range map stores the actor for each RelocateData
ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd, const DDEnabledState* ddEnabledState) {
@ -1151,11 +1151,8 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT)
inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT;
auto req = GetTeamRequest(rd.wantsNewServers,
isValleyFillerPriority(rd.priority),
true,
false,
inflightPenalty);
auto req = GetTeamRequest(
rd.wantsNewServers, isValleyFillerPriority(rd.priority), true, false, inflightPenalty);
req.src = rd.src;
req.completeSources = rd.completeSources;
@ -1638,6 +1635,7 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
state double lastRead = 0;
state bool skipCurrentLoop = false;
state const bool readRebalance = !isDiskRebalancePriority(ddPriority);
state const char* eventName = isMountainChopperPriority(ddPriority) ? "BgDDMountainChopper" : "BgDDValleyFiller";
loop {
state bool moved = false;
@ -1645,9 +1643,10 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
state Reference<IDataDistributionTeam> destTeam;
state GetTeamRequest srcReq;
state GetTeamRequest destReq;
state TraceEvent traceEvent(isMountainChopperPriority(ddPriority) ? "BgDDMountainChopper" : "BgDDValleyFiller",
self->distributorId);
traceEvent.suppressFor(5.0).detail("PollingInterval", rebalancePollingInterval);
state TraceEvent traceEvent(eventName, self->distributorId);
traceEvent.suppressFor(5.0)
.detail("PollingInterval", rebalancePollingInterval)
.detail("Rebalance", readRebalance ? "Read" : "Disk");
if (*self->lastLimited > 0) {
traceEvent.detail("SecondsSinceLastLimited", now() - *self->lastLimited);
@ -1658,6 +1657,7 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
state Future<Void> delayF = delay(0.1, TaskPriority::DataDistributionLaunch);
if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> val = wait(tr.get(rebalanceDDIgnoreKey));
lastRead = now();
if (!val.present()) {
@ -1680,9 +1680,7 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
}
}
traceEvent.detail("Enabled",
readRebalance ? (skipCurrentLoop ? "NoReadRebalance" : "ReadRebalance")
: (skipCurrentLoop ? "NoDiskRebalance" : "DiskRebalance"));
traceEvent.detail("Enabled", !skipCurrentLoop);
wait(delayF);
if (skipCurrentLoop) {
@ -1693,8 +1691,8 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
}
traceEvent.detail("QueuedRelocations", self->priority_relocations[ddPriority]);
// FIXME: find a proper number for SERVER_KNOBS->DD_REBALANCE_PARALLELISM
if (self->priority_relocations[ddPriority] < 25) {
if (self->priority_relocations[ddPriority] < SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
if (isMountainChopperPriority(ddPriority)) {
srcReq = GetTeamRequest(true, true, false, true);
destReq = GetTeamRequest(true, false, true, false);
@ -1785,7 +1783,7 @@ ACTOR Future<Void> BgDDMountainChopper(DDQueueData* self, int teamCollectionInde
disableDiskBalance = false;
} else {
if (val.get().size() > 0) {
int ddIgnore = BinaryReader::fromStringRef<int>(val.get(), Unversioned());
int ddIgnore = BinaryReader::fromStringRef<uint8_t>(val.get(), Unversioned());
disableDiskBalance = (ddIgnore & DDIgnore::REBALANCE_DISK) > 0;
disableReadBalance = (ddIgnore & DDIgnore::REBALANCE_READ) > 0;
skipCurrentLoop = disableReadBalance && disableDiskBalance;
@ -1901,7 +1899,7 @@ ACTOR Future<Void> BgDDValleyFiller(DDQueueData* self, int teamCollectionIndex)
disableDiskBalance = false;
} else if (val.present()) {
if (val.get().size() > 0) {
int ddIgnore = BinaryReader::fromStringRef<int>(val.get(), Unversioned());
int ddIgnore = BinaryReader::fromStringRef<uint8_t>(val.get(), Unversioned());
disableDiskBalance = (ddIgnore & DDIgnore::REBALANCE_DISK) > 0;
disableReadBalance = (ddIgnore & DDIgnore::REBALANCE_READ) > 0;
skipCurrentLoop = disableReadBalance && disableDiskBalance;
@ -2020,8 +2018,13 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
for (int i = 0; i < teamCollections.size(); i++) {
balancingFutures.push_back(BgDDLoadRebalance(&self, i, SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM));
balancingFutures.push_back(BgDDLoadRebalance(&self, i, SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM));
balancingFutures.push_back(BgDDLoadRebalance(&self, i, SERVER_KNOBS->PRIORITY_REBALANCE_READ_OVERUTIL_TEAM));
balancingFutures.push_back(BgDDLoadRebalance(&self, i, SERVER_KNOBS->PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM));
if(SERVER_KNOBS->READ_SAMPLING_ENABLED == true) {
balancingFutures.push_back(
BgDDLoadRebalance(&self, i, SERVER_KNOBS->PRIORITY_REBALANCE_READ_OVERUTIL_TEAM));
balancingFutures.push_back(
BgDDLoadRebalance(&self, i, SERVER_KNOBS->PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM));
}
// balancingFutures.push_back(BgDDMountainChopper(&self, i));
// balancingFutures.push_back(BgDDValleyFiller(&self, i));
}
balancingFutures.push_back(delayedAsyncVar(self.rawProcessingUnhealthy, processingUnhealthy, 0));