Move DD queue code over to using movement-reasons rather than priority (#7614)

* Use enum variables to invoke Priority checking

* add an explicit isDataMovementForReadBalancing function

* Set up RelocateShard in terms of data movement reason instead of priority

* Remove isMountainChopperPriority

* Remove isDiskRebalancePriority

* Fix formatting

* Fix misnamed DataMovementReason::TEAM_HEALTHY

Co-authored-by: Zhongxing Zhang <zhongxing.zhang@snowflake.com>
This commit is contained in:
Bharadwaj V.R 2022-07-25 00:50:37 -07:00 committed by GitHub
parent 7b6f1ca712
commit dcf09fd691
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 38 additions and 36 deletions

View File

@ -472,10 +472,10 @@ public:
if (!unhealthy && self->configuration.usableRegions > 1) {
unhealthy = iShard.remoteSrc.size() != self->configuration.storageTeamSize;
}
self->relocationProducer.send(RelocateShard(keys,
unhealthy ? SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY
: SERVER_KNOBS->PRIORITY_RECOVER_MOVE,
RelocateReason::OTHER));
self->relocationProducer.send(
RelocateShard(keys,
unhealthy ? DataMovementReason::TEAM_UNHEALTHY : DataMovementReason::RECOVER_MOVE,
RelocateReason::OTHER));
}
wait(yield(TaskPriority::DataDistribution));
@ -489,7 +489,7 @@ public:
for (; it != self->initData->dataMoveMap.ranges().end(); ++it) {
const DataMoveMetaData& meta = it.value()->meta;
if (it.value()->isCancelled() || (it.value()->valid && !CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA)) {
RelocateShard rs(meta.range, SERVER_KNOBS->PRIORITY_RECOVER_MOVE, RelocateReason::OTHER);
RelocateShard rs(meta.range, DataMovementReason::RECOVER_MOVE, RelocateReason::OTHER);
rs.dataMoveId = meta.id;
rs.cancelled = true;
self->relocationProducer.send(rs);
@ -498,7 +498,7 @@ public:
TraceEvent(SevDebug, "DDInitFoundDataMove", self->ddId).detail("DataMove", meta.toString());
ASSERT(meta.range == it.range());
// TODO: Persist priority in DataMoveMetaData.
RelocateShard rs(meta.range, SERVER_KNOBS->PRIORITY_RECOVER_MOVE, RelocateReason::OTHER);
RelocateShard rs(meta.range, DataMovementReason::RECOVER_MOVE, RelocateReason::OTHER);
rs.dataMoveId = meta.id;
rs.dataMove = it.value();
std::vector<ShardsAffectedByTeamFailure::Team> teams;
@ -578,6 +578,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
state Reference<AsyncVar<bool>> processingUnhealthy(new AsyncVar<bool>(false));
state Reference<AsyncVar<bool>> processingWiggle(new AsyncVar<bool>(false));
state Promise<Void> readyToStart;
self->shardsAffectedByTeamFailure = makeReference<ShardsAffectedByTeamFailure>();
wait(self->resumeRelocations());

View File

@ -41,12 +41,6 @@
typedef Reference<IDataDistributionTeam> ITeamRef;
typedef std::pair<ITeamRef, ITeamRef> SrcDestTeamPair;
// FIXME: Always use DataMovementReason to invoke these functions.
inline bool isDiskRebalancePriority(int priority) {
return priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM ||
priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM;
}
inline bool isDataMovementForDiskBalancing(DataMovementReason reason) {
return reason == DataMovementReason::REBALANCE_UNDERUTILIZED_TEAM ||
reason == DataMovementReason::REBALANCE_OVERUTILIZED_TEAM;
@ -57,16 +51,12 @@ inline bool isDataMovementForReadBalancing(DataMovementReason reason) {
reason == DataMovementReason::REBALANCE_READ_UNDERUTIL_TEAM;
}
inline bool isMountainChopperPriority(int priority) {
return priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM ||
priority == SERVER_KNOBS->PRIORITY_REBALANCE_READ_OVERUTIL_TEAM;
}
inline bool isDataMovementForMountainChopper(DataMovementReason reason) {
return reason == DataMovementReason::REBALANCE_OVERUTILIZED_TEAM ||
reason == DataMovementReason::REBALANCE_READ_OVERUTIL_TEAM;
}
// FIXME: Always use DataMovementReason to invoke these functions.
inline bool isValleyFillerPriority(int priority) {
return priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM ||
priority == SERVER_KNOBS->PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM;
@ -80,6 +70,9 @@ inline bool isDataMovementForValleyFiller(DataMovementReason reason) {
int dataMovementPriority(DataMovementReason reason) {
int priority;
switch (reason) {
case DataMovementReason::INVALID:
priority = -1;
break;
case DataMovementReason::RECOVER_MOVE:
priority = SERVER_KNOBS->PRIORITY_RECOVER_MOVE;
break;
@ -162,9 +155,9 @@ struct RelocateData {
: keys(rs.keys), priority(rs.priority), boundaryPriority(isBoundaryPriority(rs.priority) ? rs.priority : -1),
healthPriority(isHealthPriority(rs.priority) ? rs.priority : -1), reason(rs.reason), startTime(now()),
randomId(deterministicRandom()->randomUniqueID()), dataMoveId(rs.dataMoveId), workFactor(0),
wantsNewServers(isMountainChopperPriority(rs.priority) || isValleyFillerPriority(rs.priority) ||
rs.priority == SERVER_KNOBS->PRIORITY_SPLIT_SHARD ||
rs.priority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT),
wantsNewServers(
isDataMovementForMountainChopper(rs.moveReason) || isDataMovementForValleyFiller(rs.moveReason) ||
rs.moveReason == DataMovementReason::SPLIT_SHARD || rs.moveReason == DataMovementReason::TEAM_REDUNDANT),
cancellable(true), interval("QueuedRelocation"), dataMove(rs.dataMove) {
if (dataMove != nullptr) {
this->src.insert(this->src.end(), dataMove->meta.src.begin(), dataMove->meta.src.end());
@ -1738,7 +1731,7 @@ inline double getWorstCpu(const HealthMetrics& metrics, const std::vector<UID>&
// Move the shard with the top K highest read density of sourceTeam's to destTeam if sourceTeam has much more read load
// than destTeam
ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self,
int priority,
DataMovementReason moveReason,
Reference<IDataDistributionTeam> sourceTeam,
Reference<IDataDistributionTeam> destTeam,
bool primary,
@ -1806,7 +1799,7 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self,
ShardsAffectedByTeamFailure::Team(sourceTeam->getServerIDs(), primary));
for (int i = 0; i < shards.size(); i++) {
if (shard == shards[i]) {
self->output.send(RelocateShard(shard, priority, RelocateReason::REBALANCE_READ));
self->output.send(RelocateShard(shard, moveReason, RelocateReason::REBALANCE_READ));
self->updateLastAsSource(sourceTeam->getServerIDs());
return true;
}
@ -1817,7 +1810,7 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self,
// Move a random shard from sourceTeam if sourceTeam has much more data than provided destTeam
ACTOR static Future<bool> rebalanceTeams(DDQueueData* self,
int priority,
DataMovementReason moveReason,
Reference<IDataDistributionTeam const> sourceTeam,
Reference<IDataDistributionTeam const> destTeam,
bool primary,
@ -1878,7 +1871,7 @@ ACTOR static Future<bool> rebalanceTeams(DDQueueData* self,
ShardsAffectedByTeamFailure::Team(sourceTeam->getServerIDs(), primary));
for (int i = 0; i < shards.size(); i++) {
if (moveShard == shards[i]) {
self->output.send(RelocateShard(moveShard, priority, RelocateReason::REBALANCE_DISK));
self->output.send(RelocateShard(moveShard, moveReason, RelocateReason::REBALANCE_DISK));
return true;
}
}
@ -2007,9 +2000,9 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
// clang-format off
if (sourceTeam.isValid() && destTeam.isValid()) {
if (readRebalance) {
wait(store(moved,rebalanceReadLoad(self, ddPriority, sourceTeam, destTeam, teamCollectionIndex == 0, &traceEvent)));
wait(store(moved,rebalanceReadLoad(self, reason, sourceTeam, destTeam, teamCollectionIndex == 0, &traceEvent)));
} else {
wait(store(moved,rebalanceTeams(self, ddPriority, sourceTeam, destTeam, teamCollectionIndex == 0, &traceEvent)));
wait(store(moved,rebalanceTeams(self, reason, sourceTeam, destTeam, teamCollectionIndex == 0, &traceEvent)));
}
}
// clang-format on
@ -2105,7 +2098,7 @@ ACTOR Future<Void> BgDDMountainChopper(DDQueueData* self, int teamCollectionInde
if (loadedTeam.first.present()) {
bool _moved = wait(rebalanceTeams(self,
SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM,
DataMovementReason::REBALANCE_OVERUTILIZED_TEAM,
loadedTeam.first.get(),
randomTeam.first.get(),
teamCollectionIndex == 0,
@ -2204,7 +2197,7 @@ ACTOR Future<Void> BgDDValleyFiller(DDQueueData* self, int teamCollectionIndex)
if (unloadedTeam.first.present()) {
bool _moved = wait(rebalanceTeams(self,
SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM,
DataMovementReason::REBALANCE_UNDERUTILIZED_TEAM,
randomTeam.first.get(),
unloadedTeam.first.get(),
teamCollectionIndex == 0,
@ -2266,8 +2259,8 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
for (int i = 0; i < teamCollections.size(); i++) {
// FIXME: Use BgDDLoadBalance for disk rebalance too after DD simulation test proof.
// balancingFutures.push_back(BgDDLoadRebalance(&self, i, SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM));
// balancingFutures.push_back(BgDDLoadRebalance(&self, i, SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM));
// balancingFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_OVERUTILIZED_TEAM));
// balancingFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_UNDERUTILIZED_TEAM));
if (SERVER_KNOBS->READ_SAMPLING_ENABLED) {
balancingFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_READ_OVERUTIL_TEAM));
balancingFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_READ_UNDERUTIL_TEAM));

View File

@ -524,12 +524,12 @@ ACTOR Future<Void> shardSplitter(DataDistributionTracker* self,
for (int i = 0; i < skipRange; i++) {
KeyRangeRef r(splitKeys[i], splitKeys[i + 1]);
self->shardsAffectedByTeamFailure->defineShard(r);
self->output.send(RelocateShard(r, SERVER_KNOBS->PRIORITY_SPLIT_SHARD, RelocateReason::OTHER));
self->output.send(RelocateShard(r, DataMovementReason::SPLIT_SHARD, RelocateReason::OTHER));
}
for (int i = numShards - 1; i > skipRange; i--) {
KeyRangeRef r(splitKeys[i], splitKeys[i + 1]);
self->shardsAffectedByTeamFailure->defineShard(r);
self->output.send(RelocateShard(r, SERVER_KNOBS->PRIORITY_SPLIT_SHARD, RelocateReason::OTHER));
self->output.send(RelocateShard(r, DataMovementReason::SPLIT_SHARD, RelocateReason::OTHER));
}
self->sizeChanges.add(changeSizes(self, keys, shardSize->get().get().metrics.bytes));
@ -675,7 +675,7 @@ Future<Void> shardMerger(DataDistributionTracker* self,
}
restartShardTrackers(self, mergeRange, ShardMetrics(endingStats, lastLowBandwidthStartTime, shardCount));
self->shardsAffectedByTeamFailure->defineShard(mergeRange);
self->output.send(RelocateShard(mergeRange, SERVER_KNOBS->PRIORITY_MERGE_SHARD, RelocateReason::OTHER));
self->output.send(RelocateShard(mergeRange, DataMovementReason::MERGE_SHARD, RelocateReason::OTHER));
// We are about to be cancelled by the call to restartShardTrackers
return Void();

View File

@ -39,6 +39,7 @@ enum class RelocateReason { INVALID = -1, OTHER, REBALANCE_DISK, REBALANCE_READ
// One-to-one relationship to the priority knobs
enum class DataMovementReason {
INVALID,
RECOVER_MOVE,
REBALANCE_UNDERUTILIZED_TEAM,
REBALANCE_OVERUTILIZED_TEAM,
@ -60,6 +61,8 @@ enum class DataMovementReason {
struct DDShardInfo;
extern int dataMovementPriority(DataMovementReason moveReason);
// Represents a data move in DD.
struct DataMove {
DataMove() : meta(DataMoveMetaData()), restore(false), valid(false), cancelled(false) {}
@ -89,9 +92,14 @@ struct RelocateShard {
std::shared_ptr<DataMove> dataMove; // Not null if this is a restored data move.
UID dataMoveId;
RelocateReason reason;
RelocateShard() : priority(0), cancelled(false), dataMoveId(anonymousShardId), reason(RelocateReason::INVALID) {}
RelocateShard(KeyRange const& keys, int priority, RelocateReason reason)
: keys(keys), priority(priority), cancelled(false), dataMoveId(anonymousShardId), reason(reason) {}
DataMovementReason moveReason;
RelocateShard()
: priority(0), cancelled(false), dataMoveId(anonymousShardId), reason(RelocateReason::INVALID),
moveReason(DataMovementReason::INVALID) {}
RelocateShard(KeyRange const& keys, DataMovementReason moveReason, RelocateReason reason)
: keys(keys), cancelled(false), dataMoveId(anonymousShardId), reason(reason), moveReason(moveReason) {
priority = dataMovementPriority(moveReason);
}
bool isRestore() const { return this->dataMove != nullptr; }
};