Merge pull request #2234 from etschannen/feature-split-priority
Raised the data distribution priority of splitting shards
This commit is contained in:
commit
80015146cd
|
@ -189,7 +189,7 @@ public:
|
|||
int priority;
|
||||
|
||||
explicit TCTeamInfo(vector<Reference<TCServerInfo>> const& servers)
|
||||
: servers(servers), healthy(true), priority(PRIORITY_TEAM_HEALTHY), wrongConfiguration(false) {
|
||||
: servers(servers), healthy(true), priority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY), wrongConfiguration(false) {
|
||||
if (servers.empty()) {
|
||||
TraceEvent(SevInfo, "ConstructTCTeamFromEmptyServers");
|
||||
}
|
||||
|
@ -2865,25 +2865,25 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
|
|||
state int lastPriority = team->getPriority();
|
||||
if( serversLeft < self->configuration.storageTeamSize ) {
|
||||
if( serversLeft == 0 )
|
||||
team->setPriority( PRIORITY_TEAM_0_LEFT );
|
||||
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_0_LEFT );
|
||||
else if( serversLeft == 1 )
|
||||
team->setPriority( PRIORITY_TEAM_1_LEFT );
|
||||
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_1_LEFT );
|
||||
else if( serversLeft == 2 )
|
||||
team->setPriority( PRIORITY_TEAM_2_LEFT );
|
||||
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_2_LEFT );
|
||||
else
|
||||
team->setPriority( PRIORITY_TEAM_UNHEALTHY );
|
||||
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY );
|
||||
}
|
||||
else if ( badTeam || anyWrongConfiguration ) {
|
||||
if ( redundantTeam ) {
|
||||
team->setPriority( PRIORITY_TEAM_REDUNDANT );
|
||||
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT );
|
||||
} else {
|
||||
team->setPriority( PRIORITY_TEAM_UNHEALTHY );
|
||||
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY );
|
||||
}
|
||||
}
|
||||
else if( anyUndesired )
|
||||
team->setPriority( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER );
|
||||
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER );
|
||||
else
|
||||
team->setPriority( PRIORITY_TEAM_HEALTHY );
|
||||
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_HEALTHY );
|
||||
|
||||
if(lastPriority != team->getPriority()) {
|
||||
self->priority_teams[lastPriority]--;
|
||||
|
@ -2901,13 +2901,13 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
|
|||
|
||||
for(int i=0; i<shards.size(); i++) {
|
||||
int maxPriority = team->getPriority();
|
||||
if(maxPriority < PRIORITY_TEAM_0_LEFT) {
|
||||
if(maxPriority < SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) {
|
||||
auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] );
|
||||
for( int j=0; j < teams.first.size()+teams.second.size(); j++) {
|
||||
// t is the team in primary DC or the remote DC
|
||||
auto& t = j < teams.first.size() ? teams.first[j] : teams.second[j-teams.first.size()];
|
||||
if( !t.servers.size() ) {
|
||||
maxPriority = PRIORITY_TEAM_0_LEFT;
|
||||
maxPriority = SERVER_KNOBS->PRIORITY_TEAM_0_LEFT;
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -2931,8 +2931,8 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
|
|||
// false We want to differentiate the redundant_team from unhealthy_team in
|
||||
// terms of relocate priority
|
||||
maxPriority =
|
||||
std::max<int>(maxPriority, redundantTeam ? PRIORITY_TEAM_REDUNDANT
|
||||
: PRIORITY_TEAM_UNHEALTHY);
|
||||
std::max<int>(maxPriority, redundantTeam ? SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT
|
||||
: SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY);
|
||||
}
|
||||
} else {
|
||||
TEST(true); // A removed server is still associated with a team in SABTF
|
||||
|
@ -4174,9 +4174,21 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
|
|||
.detail( "InFlight", 0 )
|
||||
.detail( "InQueue", 0 )
|
||||
.detail( "AverageShardSize", -1 )
|
||||
.detail( "LowPriorityRelocations", 0 )
|
||||
.detail( "HighPriorityRelocations", 0 )
|
||||
.detail( "UnhealthyRelocations", 0 )
|
||||
.detail( "HighestPriority", 0 )
|
||||
.detail( "BytesWritten", 0 )
|
||||
.detail( "PriorityRecoverMove", 0 )
|
||||
.detail( "PriorityRebalanceUnderutilizedTeam", 0 )
|
||||
.detail( "PriorityRebalannceOverutilizedTeam", 0)
|
||||
.detail( "PriorityTeamHealthy", 0 )
|
||||
.detail( "PriorityTeamContainsUndesiredServer", 0 )
|
||||
.detail( "PriorityTeamRedundant", 0 )
|
||||
.detail( "PriorityMergeShard", 0 )
|
||||
.detail( "PriorityTeamUnhealthy", 0 )
|
||||
.detail( "PriorityTeam2Left", 0 )
|
||||
.detail( "PriorityTeam1Left", 0 )
|
||||
.detail( "PriorityTeam0Left", 0 )
|
||||
.detail( "PrioritySplitShard", 0 )
|
||||
.trackLatest( "MovingData" );
|
||||
|
||||
TraceEvent("TotalDataInFlight", self->ddId).detail("Primary", true).detail("TotalBytes", 0).detail("UnhealthyServers", 0).detail("HighestPriority", 0).trackLatest("TotalDataInFlight");
|
||||
|
@ -4219,7 +4231,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
|
|||
if (!unhealthy && configuration.usableRegions > 1) {
|
||||
unhealthy = initData->shards[shard].remoteSrc.size() != configuration.storageTeamSize;
|
||||
}
|
||||
output.send( RelocateShard( keys, unhealthy ? PRIORITY_TEAM_UNHEALTHY : PRIORITY_RECOVER_MOVE ) );
|
||||
output.send( RelocateShard( keys, unhealthy ? SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY : SERVER_KNOBS->PRIORITY_RECOVER_MOVE ) );
|
||||
}
|
||||
wait( yield(TaskPriority::DataDistribution) );
|
||||
}
|
||||
|
|
|
@ -38,33 +38,6 @@ struct RelocateShard {
|
|||
RelocateShard( KeyRange const& keys, int priority ) : keys(keys), priority(priority) {}
|
||||
};
|
||||
|
||||
// Higher priorities are executed first
|
||||
// Priority/100 is the "priority group"/"superpriority". Priority inversion
|
||||
// is possible within but not between priority groups; fewer priority groups
|
||||
// mean better worst case time bounds
|
||||
enum {
|
||||
PRIORITY_REBALANCE_SHARD = 100,
|
||||
PRIORITY_RECOVER_MOVE = 110,
|
||||
PRIORITY_REBALANCE_UNDERUTILIZED_TEAM = 120,
|
||||
PRIORITY_REBALANCE_OVERUTILIZED_TEAM = 121,
|
||||
PRIORITY_TEAM_HEALTHY = 140,
|
||||
PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER = 150,
|
||||
|
||||
// Set removing_redundant_team priority lower than merge/split_shard_priority,
|
||||
// so that removing redundant teams does not block merge/split shards.
|
||||
PRIORITY_TEAM_REDUNDANT = 200,
|
||||
|
||||
PRIORITY_MERGE_SHARD = 340,
|
||||
PRIORITY_SPLIT_SHARD = 350,
|
||||
|
||||
PRIORITY_TEAM_UNHEALTHY = 800,
|
||||
PRIORITY_TEAM_2_LEFT = 809,
|
||||
|
||||
PRIORITY_TEAM_1_LEFT = 900,
|
||||
|
||||
PRIORITY_TEAM_0_LEFT = 999
|
||||
};
|
||||
|
||||
enum {
|
||||
SOME_SHARED = 2,
|
||||
NONE_SHARED = 3
|
||||
|
|
|
@ -37,6 +37,9 @@
|
|||
struct RelocateData {
|
||||
KeyRange keys;
|
||||
int priority;
|
||||
int boundaryPriority;
|
||||
int healthPriority;
|
||||
|
||||
double startTime;
|
||||
UID randomId;
|
||||
int workFactor;
|
||||
|
@ -45,34 +48,42 @@ struct RelocateData {
|
|||
bool wantsNewServers;
|
||||
TraceInterval interval;
|
||||
|
||||
RelocateData() : startTime(-1), priority(-1), workFactor(0), wantsNewServers(false), interval("QueuedRelocation") {}
|
||||
RelocateData( RelocateShard const& rs ) : keys(rs.keys), priority(rs.priority), startTime(now()), randomId(deterministicRandom()->randomUniqueID()), workFactor(0),
|
||||
RelocateData() : startTime(-1), priority(-1), boundaryPriority(-1), healthPriority(-1), workFactor(0), wantsNewServers(false), interval("QueuedRelocation") {}
|
||||
explicit RelocateData( RelocateShard const& rs ) : keys(rs.keys), priority(rs.priority), boundaryPriority(isBoundaryPriority(rs.priority) ? rs.priority : -1), healthPriority(isHealthPriority(rs.priority) ? rs.priority : -1), startTime(now()), randomId(deterministicRandom()->randomUniqueID()), workFactor(0),
|
||||
wantsNewServers(
|
||||
rs.priority == PRIORITY_REBALANCE_SHARD ||
|
||||
rs.priority == PRIORITY_REBALANCE_OVERUTILIZED_TEAM ||
|
||||
rs.priority == PRIORITY_REBALANCE_UNDERUTILIZED_TEAM ||
|
||||
rs.priority == PRIORITY_SPLIT_SHARD ||
|
||||
rs.priority == PRIORITY_TEAM_REDUNDANT ||
|
||||
rs.priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM ||
|
||||
rs.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM ||
|
||||
rs.priority == SERVER_KNOBS->PRIORITY_SPLIT_SHARD ||
|
||||
rs.priority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT ||
|
||||
mergeWantsNewServers(rs.keys, rs.priority)), interval("QueuedRelocation") {}
|
||||
|
||||
static bool mergeWantsNewServers(KeyRangeRef keys, int priority) {
|
||||
return priority == PRIORITY_MERGE_SHARD &&
|
||||
return priority == SERVER_KNOBS->PRIORITY_MERGE_SHARD &&
|
||||
(SERVER_KNOBS->MERGE_ONTO_NEW_TEAM == 2 ||
|
||||
(SERVER_KNOBS->MERGE_ONTO_NEW_TEAM == 1 && keys.begin.startsWith(LiteralStringRef("\xff"))));
|
||||
}
|
||||
|
||||
static bool isHealthPriority(int priority) {
|
||||
return priority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY ||
|
||||
priority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT ||
|
||||
priority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT ||
|
||||
priority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT ||
|
||||
priority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT ||
|
||||
priority == SERVER_KNOBS->PRIORITY_TEAM_HEALTHY ||
|
||||
priority == SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER;
|
||||
}
|
||||
|
||||
static bool isBoundaryPriority(int priority) {
|
||||
return priority == SERVER_KNOBS->PRIORITY_SPLIT_SHARD ||
|
||||
priority == SERVER_KNOBS->PRIORITY_MERGE_SHARD;
|
||||
}
|
||||
|
||||
bool operator> (const RelocateData& rhs) const {
|
||||
return priority != rhs.priority ? priority > rhs.priority : ( startTime != rhs.startTime ? startTime < rhs.startTime : randomId > rhs.randomId );
|
||||
}
|
||||
|
||||
bool operator== (const RelocateData& rhs) const {
|
||||
return priority == rhs.priority && keys == rhs.keys && startTime == rhs.startTime && workFactor == rhs.workFactor && src == rhs.src && completeSources == rhs.completeSources && wantsNewServers == rhs.wantsNewServers && randomId == rhs.randomId;
|
||||
}
|
||||
|
||||
bool changesBoundaries() {
|
||||
return priority == PRIORITY_MERGE_SHARD ||
|
||||
priority == PRIORITY_SPLIT_SHARD ||
|
||||
priority == PRIORITY_RECOVER_MOVE;
|
||||
return priority == rhs.priority && boundaryPriority == rhs.boundaryPriority && healthPriority == rhs.healthPriority && keys == rhs.keys && startTime == rhs.startTime && workFactor == rhs.workFactor && src == rhs.src && completeSources == rhs.completeSources && wantsNewServers == rhs.wantsNewServers && randomId == rhs.randomId;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -285,9 +296,9 @@ int getWorkFactor( RelocateData const& relocation ) {
|
|||
// Avoid the divide by 0!
|
||||
ASSERT( relocation.src.size() );
|
||||
|
||||
if( relocation.priority >= PRIORITY_TEAM_1_LEFT )
|
||||
if( relocation.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || relocation.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT )
|
||||
return WORK_FULL_UTILIZATION / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
|
||||
else if( relocation.priority >= PRIORITY_TEAM_2_LEFT )
|
||||
else if( relocation.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT )
|
||||
return WORK_FULL_UTILIZATION / 2 / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
|
||||
else // for now we assume that any message at a lower priority can best be assumed to have a full team left for work
|
||||
return WORK_FULL_UTILIZATION / relocation.src.size() / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
|
||||
|
@ -384,26 +395,28 @@ struct DDQueueData {
|
|||
|
||||
std::map<int, int> priority_relocations;
|
||||
int unhealthyRelocations;
|
||||
void startRelocation(int priority) {
|
||||
void startRelocation(int priority, int healthPriority) {
|
||||
// Although PRIORITY_TEAM_REDUNDANT has lower priority than split and merge shard movement,
|
||||
// we must count it into unhealthyRelocations; because team removers relies on unhealthyRelocations to
|
||||
// ensure a team remover will not start before the previous one finishes removing a team and move away data
|
||||
// NOTE: split and merge shard have higher priority. If they have to wait for unhealthyRelocations = 0,
|
||||
// deadlock may happen: split/merge shard waits for unhealthyRelocations, while blocks team_redundant.
|
||||
if (priority >= PRIORITY_TEAM_UNHEALTHY || priority == PRIORITY_TEAM_REDUNDANT) {
|
||||
if (healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT ||
|
||||
healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT) {
|
||||
unhealthyRelocations++;
|
||||
rawProcessingUnhealthy->set(true);
|
||||
}
|
||||
priority_relocations[priority]++;
|
||||
}
|
||||
void finishRelocation(int priority) {
|
||||
if (priority >= PRIORITY_TEAM_UNHEALTHY || priority == PRIORITY_TEAM_REDUNDANT) {
|
||||
void finishRelocation(int priority, int healthPriority) {
|
||||
if (healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT ||
|
||||
healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT) {
|
||||
unhealthyRelocations--;
|
||||
ASSERT(unhealthyRelocations >= 0);
|
||||
if(unhealthyRelocations == 0) {
|
||||
rawProcessingUnhealthy->set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
priority_relocations[priority]--;
|
||||
}
|
||||
|
||||
|
@ -524,7 +537,7 @@ struct DDQueueData {
|
|||
state Transaction tr(cx);
|
||||
|
||||
// FIXME: is the merge case needed
|
||||
if( input.priority == PRIORITY_MERGE_SHARD ) {
|
||||
if( input.priority == SERVER_KNOBS->PRIORITY_MERGE_SHARD ) {
|
||||
wait( delay( 0.5, decrementPriority(decrementPriority(TaskPriority::DataDistribution )) ) );
|
||||
} else {
|
||||
wait( delay( 0.0001, TaskPriority::DataDistributionLaunch ) );
|
||||
|
@ -586,10 +599,14 @@ struct DDQueueData {
|
|||
}
|
||||
|
||||
//This function cannot handle relocation requests which split a shard into three pieces
|
||||
void queueRelocation( RelocateData rd, std::set<UID> &serversToLaunchFrom ) {
|
||||
void queueRelocation( RelocateShard rs, std::set<UID> &serversToLaunchFrom ) {
|
||||
//TraceEvent("QueueRelocationBegin").detail("Begin", rd.keys.begin).detail("End", rd.keys.end);
|
||||
|
||||
// remove all items from both queues that are fully contained in the new relocation (i.e. will be overwritten)
|
||||
RelocateData rd(rs);
|
||||
bool hasHealthPriority = RelocateData::isHealthPriority( rd.priority );
|
||||
bool hasBoundaryPriority = RelocateData::isBoundaryPriority( rd.priority );
|
||||
|
||||
auto ranges = queueMap.intersectingRanges( rd.keys );
|
||||
for(auto r = ranges.begin(); r != ranges.end(); ++r ) {
|
||||
RelocateData& rrs = r->value();
|
||||
|
@ -611,9 +628,13 @@ struct DDQueueData {
|
|||
if( foundActiveFetching || foundActiveRelocation ) {
|
||||
rd.wantsNewServers |= rrs.wantsNewServers;
|
||||
rd.startTime = std::min( rd.startTime, rrs.startTime );
|
||||
if ((rrs.priority >= PRIORITY_TEAM_UNHEALTHY || rrs.priority == PRIORITY_TEAM_REDUNDANT) &&
|
||||
rd.changesBoundaries())
|
||||
rd.priority = std::max( rd.priority, rrs.priority );
|
||||
if(!hasHealthPriority) {
|
||||
rd.healthPriority = std::max(rd.healthPriority, rrs.healthPriority);
|
||||
}
|
||||
if(!hasBoundaryPriority) {
|
||||
rd.boundaryPriority = std::max(rd.boundaryPriority, rrs.boundaryPriority);
|
||||
}
|
||||
rd.priority = std::max(rd.priority, std::max(rd.boundaryPriority, rd.healthPriority));
|
||||
}
|
||||
|
||||
if( rd.keys.contains( rrs.keys ) ) {
|
||||
|
@ -631,7 +652,7 @@ struct DDQueueData {
|
|||
/*TraceEvent(rrs.interval.end(), mi.id()).detail("Result","Cancelled")
|
||||
.detail("WasFetching", foundActiveFetching).detail("Contained", rd.keys.contains( rrs.keys ));*/
|
||||
queuedRelocations--;
|
||||
finishRelocation(rrs.priority);
|
||||
finishRelocation(rrs.priority, rrs.healthPriority);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -658,7 +679,7 @@ struct DDQueueData {
|
|||
.detail("KeyBegin", rrs.keys.begin).detail("KeyEnd", rrs.keys.end)
|
||||
.detail("Priority", rrs.priority).detail("WantsNewServers", rrs.wantsNewServers);*/
|
||||
queuedRelocations++;
|
||||
startRelocation(rrs.priority);
|
||||
startRelocation(rrs.priority, rrs.healthPriority);
|
||||
|
||||
fetchingSourcesQueue.insert( rrs );
|
||||
getSourceActors.insert( rrs.keys, getSourceServersForRange( cx, rrs, fetchSourceServersComplete ) );
|
||||
|
@ -678,7 +699,7 @@ struct DDQueueData {
|
|||
.detail("KeyBegin", newData.keys.begin).detail("KeyEnd", newData.keys.end)
|
||||
.detail("Priority", newData.priority).detail("WantsNewServers", newData.wantsNewServers);*/
|
||||
queuedRelocations++;
|
||||
startRelocation(newData.priority);
|
||||
startRelocation(newData.priority, newData.healthPriority);
|
||||
foundActiveRelocation = true;
|
||||
}
|
||||
|
||||
|
@ -773,7 +794,7 @@ struct DDQueueData {
|
|||
for(auto it = intersectingInFlight.begin(); it != intersectingInFlight.end(); ++it) {
|
||||
if (fetchKeysComplete.count(it->value()) && inFlightActors.liveActorAt(it->range().begin) &&
|
||||
!rd.keys.contains(it->range()) && it->value().priority >= rd.priority &&
|
||||
rd.priority < PRIORITY_TEAM_UNHEALTHY) {
|
||||
rd.healthPriority < SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY) {
|
||||
/*TraceEvent("OverlappingInFlight", distributorId)
|
||||
.detail("KeyBegin", it->value().keys.begin)
|
||||
.detail("KeyEnd", it->value().keys.end)
|
||||
|
@ -813,7 +834,7 @@ struct DDQueueData {
|
|||
|
||||
//TraceEvent(rd.interval.end(), distributorId).detail("Result","Success");
|
||||
queuedRelocations--;
|
||||
finishRelocation(rd.priority);
|
||||
finishRelocation(rd.priority, rd.healthPriority);
|
||||
|
||||
// now we are launching: remove this entry from the queue of all the src servers
|
||||
for( int i = 0; i < rd.src.size(); i++ ) {
|
||||
|
@ -841,7 +862,7 @@ struct DDQueueData {
|
|||
|
||||
launch( rrs, busymap );
|
||||
activeRelocations++;
|
||||
startRelocation(rrs.priority);
|
||||
startRelocation(rrs.priority, rrs.healthPriority);
|
||||
inFlightActors.insert( rrs.keys, dataDistributionRelocator( this, rrs ) );
|
||||
}
|
||||
|
||||
|
@ -912,10 +933,10 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
|
|||
bestTeams.clear();
|
||||
while( tciIndex < self->teamCollections.size() ) {
|
||||
double inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_HEALTHY;
|
||||
if(rd.priority >= PRIORITY_TEAM_UNHEALTHY) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_UNHEALTHY;
|
||||
if(rd.priority >= PRIORITY_TEAM_1_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT;
|
||||
if(rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_UNHEALTHY;
|
||||
if(rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT;
|
||||
|
||||
auto req = GetTeamRequest(rd.wantsNewServers, rd.priority == PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, true, inflightPenalty);
|
||||
auto req = GetTeamRequest(rd.wantsNewServers, rd.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, true, inflightPenalty);
|
||||
req.sources = rd.src;
|
||||
req.completeSources = rd.completeSources;
|
||||
Optional<Reference<IDataDistributionTeam>> bestTeam = wait(brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req)));
|
||||
|
@ -1154,7 +1175,7 @@ ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<ID
|
|||
std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) );
|
||||
for( int i = 0; i < shards.size(); i++ ) {
|
||||
if( moveShard == shards[i] ) {
|
||||
TraceEvent(priority == PRIORITY_REBALANCE_OVERUTILIZED_TEAM ? "BgDDMountainChopper" : "BgDDValleyFiller", self->distributorId)
|
||||
TraceEvent(priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM ? "BgDDMountainChopper" : "BgDDValleyFiller", self->distributorId)
|
||||
.detail("SourceBytes", sourceBytes)
|
||||
.detail("DestBytes", destBytes)
|
||||
.detail("ShardBytes", metrics.bytes)
|
||||
|
@ -1197,7 +1218,7 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
|
|||
std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL);
|
||||
continue;
|
||||
}
|
||||
if (self->priority_relocations[PRIORITY_REBALANCE_OVERUTILIZED_TEAM] <
|
||||
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] <
|
||||
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
|
||||
state Optional<Reference<IDataDistributionTeam>> randomTeam = wait(brokenPromiseToNever(
|
||||
self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, false, true))));
|
||||
|
@ -1208,7 +1229,7 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
|
|||
GetTeamRequest(true, true, false))));
|
||||
if (loadedTeam.present()) {
|
||||
bool moved =
|
||||
wait(rebalanceTeams(self, PRIORITY_REBALANCE_OVERUTILIZED_TEAM, loadedTeam.get(),
|
||||
wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM, loadedTeam.get(),
|
||||
randomTeam.get(), teamCollectionIndex == 0));
|
||||
if (moved) {
|
||||
resetCount = 0;
|
||||
|
@ -1266,7 +1287,7 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
|
|||
std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL);
|
||||
continue;
|
||||
}
|
||||
if (self->priority_relocations[PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] <
|
||||
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] <
|
||||
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
|
||||
state Optional<Reference<IDataDistributionTeam>> randomTeam = wait(brokenPromiseToNever(
|
||||
self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, false, false))));
|
||||
|
@ -1276,7 +1297,7 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
|
|||
if (unloadedTeam.present()) {
|
||||
if (unloadedTeam.get()->getMinFreeSpaceRatio() > SERVER_KNOBS->FREE_SPACE_RATIO_DD_CUTOFF) {
|
||||
bool moved =
|
||||
wait(rebalanceTeams(self, PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, randomTeam.get(),
|
||||
wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, randomTeam.get(),
|
||||
unloadedTeam.get(), teamCollectionIndex == 0));
|
||||
if (moved) {
|
||||
resetCount = 0;
|
||||
|
@ -1382,7 +1403,7 @@ ACTOR Future<Void> dataDistributionQueue(
|
|||
}
|
||||
when ( RelocateData done = waitNext( self.relocationComplete.getFuture() ) ) {
|
||||
self.activeRelocations--;
|
||||
self.finishRelocation(done.priority);
|
||||
self.finishRelocation(done.priority, done.healthPriority);
|
||||
self.fetchKeysComplete.erase( done );
|
||||
//self.logRelocation( done, "ShardRelocatorDone" );
|
||||
actors.add( tag( delay(0, TaskPriority::DataDistributionLaunch), done.keys, rangesComplete ) );
|
||||
|
@ -1400,24 +1421,32 @@ ACTOR Future<Void> dataDistributionQueue(
|
|||
|
||||
recordMetrics = delay(SERVER_KNOBS->DD_QUEUE_LOGGING_INTERVAL);
|
||||
|
||||
int lowPriorityRelocations = 0, highPriorityRelocations = 0, highestPriorityRelocation = 0;
|
||||
int highestPriorityRelocation = 0;
|
||||
for( auto it = self.priority_relocations.begin(); it != self.priority_relocations.end(); ++it ) {
|
||||
if (it->second)
|
||||
if (it->second) {
|
||||
highestPriorityRelocation = std::max(highestPriorityRelocation, it->first);
|
||||
if( it->first < 200 )
|
||||
lowPriorityRelocations += it->second;
|
||||
else
|
||||
highPriorityRelocations += it->second;
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("MovingData", distributorId)
|
||||
.detail( "InFlight", self.activeRelocations )
|
||||
.detail( "InQueue", self.queuedRelocations )
|
||||
.detail( "AverageShardSize", req.getFuture().isReady() ? req.getFuture().get() : -1 )
|
||||
.detail( "LowPriorityRelocations", lowPriorityRelocations )
|
||||
.detail( "HighPriorityRelocations", highPriorityRelocations )
|
||||
.detail( "UnhealthyRelocations", self.unhealthyRelocations )
|
||||
.detail( "HighestPriority", highestPriorityRelocation )
|
||||
.detail( "BytesWritten", self.bytesWritten )
|
||||
.detail( "PriorityRecoverMove", self.priority_relocations[SERVER_KNOBS->PRIORITY_RECOVER_MOVE] )
|
||||
.detail( "PriorityRebalanceUnderutilizedTeam", self.priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] )
|
||||
.detail( "PriorityRebalannceOverutilizedTeam", self.priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] )
|
||||
.detail( "PriorityTeamHealthy", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_HEALTHY] )
|
||||
.detail( "PriorityTeamContainsUndesiredServer", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER] )
|
||||
.detail( "PriorityTeamRedundant", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT] )
|
||||
.detail( "PriorityMergeShard", self.priority_relocations[SERVER_KNOBS->PRIORITY_MERGE_SHARD] )
|
||||
.detail( "PriorityTeamUnhealthy", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY] )
|
||||
.detail( "PriorityTeam2Left", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_2_LEFT] )
|
||||
.detail( "PriorityTeam1Left", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_1_LEFT] )
|
||||
.detail( "PriorityTeam0Left", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_0_LEFT] )
|
||||
.detail( "PrioritySplitShard", self.priority_relocations[SERVER_KNOBS->PRIORITY_SPLIT_SHARD] )
|
||||
.trackLatest( "MovingData" );
|
||||
}
|
||||
when ( wait( self.error.getFuture() ) ) {} // Propagate errors from dataDistributionRelocator
|
||||
|
|
|
@ -369,12 +369,12 @@ ACTOR Future<Void> shardSplitter(
|
|||
for( int i = 0; i < skipRange; i++ ) {
|
||||
KeyRangeRef r(splitKeys[i], splitKeys[i+1]);
|
||||
self->shardsAffectedByTeamFailure->defineShard( r );
|
||||
self->output.send( RelocateShard( r, PRIORITY_SPLIT_SHARD) );
|
||||
self->output.send( RelocateShard( r, SERVER_KNOBS->PRIORITY_SPLIT_SHARD) );
|
||||
}
|
||||
for( int i = numShards-1; i > skipRange; i-- ) {
|
||||
KeyRangeRef r(splitKeys[i], splitKeys[i+1]);
|
||||
self->shardsAffectedByTeamFailure->defineShard( r );
|
||||
self->output.send( RelocateShard( r, PRIORITY_SPLIT_SHARD) );
|
||||
self->output.send( RelocateShard( r, SERVER_KNOBS->PRIORITY_SPLIT_SHARD) );
|
||||
}
|
||||
|
||||
self->sizeChanges.add( changeSizes( self, keys, shardSize->get().get().bytes ) );
|
||||
|
@ -475,7 +475,7 @@ Future<Void> shardMerger(
|
|||
|
||||
restartShardTrackers( self, mergeRange, endingStats );
|
||||
self->shardsAffectedByTeamFailure->defineShard( mergeRange );
|
||||
self->output.send( RelocateShard( mergeRange, PRIORITY_MERGE_SHARD ) );
|
||||
self->output.send( RelocateShard( mergeRange, SERVER_KNOBS->PRIORITY_MERGE_SHARD ) );
|
||||
|
||||
// We are about to be cancelled by the call to restartShardTrackers
|
||||
return Void();
|
||||
|
|
|
@ -105,6 +105,19 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( INFLIGHT_PENALTY_UNHEALTHY, 10.0 );
|
||||
init( INFLIGHT_PENALTY_ONE_LEFT, 1000.0 );
|
||||
init( MERGE_ONTO_NEW_TEAM, 1 ); if( randomize && BUGGIFY ) MERGE_ONTO_NEW_TEAM = deterministicRandom()->coinflip() ? 0 : 2;
|
||||
|
||||
init( PRIORITY_RECOVER_MOVE, 110 );
|
||||
init( PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, 120 );
|
||||
init( PRIORITY_REBALANCE_OVERUTILIZED_TEAM, 121 );
|
||||
init( PRIORITY_TEAM_HEALTHY, 140 );
|
||||
init( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER, 150 );
|
||||
init( PRIORITY_TEAM_REDUNDANT, 200 );
|
||||
init( PRIORITY_MERGE_SHARD, 340 );
|
||||
init( PRIORITY_TEAM_UNHEALTHY, 700 );
|
||||
init( PRIORITY_TEAM_2_LEFT, 709 );
|
||||
init( PRIORITY_TEAM_1_LEFT, 800 );
|
||||
init( PRIORITY_TEAM_0_LEFT, 809 );
|
||||
init( PRIORITY_SPLIT_SHARD, 900 ); if( randomize && BUGGIFY ) PRIORITY_SPLIT_SHARD = 350;
|
||||
|
||||
// Data distribution
|
||||
init( RETRY_RELOCATESHARD_DELAY, 0.1 );
|
||||
|
|
|
@ -106,6 +106,24 @@ public:
|
|||
double INFLIGHT_PENALTY_ONE_LEFT;
|
||||
int MERGE_ONTO_NEW_TEAM; // Merges will request new servers. 0 for off, 1 for \xff only, 2 for all shards.
|
||||
|
||||
// Higher priorities are executed first
|
||||
// Priority/100 is the "priority group"/"superpriority". Priority inversion
|
||||
// is possible within but not between priority groups; fewer priority groups
|
||||
// mean better worst case time bounds
|
||||
// Maximum allowable priority is 999.
|
||||
int PRIORITY_RECOVER_MOVE;
|
||||
int PRIORITY_REBALANCE_UNDERUTILIZED_TEAM;
|
||||
int PRIORITY_REBALANCE_OVERUTILIZED_TEAM;
|
||||
int PRIORITY_TEAM_HEALTHY;
|
||||
int PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER;
|
||||
int PRIORITY_TEAM_REDUNDANT;
|
||||
int PRIORITY_MERGE_SHARD;
|
||||
int PRIORITY_TEAM_UNHEALTHY;
|
||||
int PRIORITY_TEAM_2_LEFT;
|
||||
int PRIORITY_TEAM_1_LEFT;
|
||||
int PRIORITY_TEAM_0_LEFT;
|
||||
int PRIORITY_SPLIT_SHARD;
|
||||
|
||||
// Data distribution
|
||||
double RETRY_RELOCATESHARD_DELAY;
|
||||
double DATA_DISTRIBUTION_FAILURE_REACTION_TIME;
|
||||
|
|
|
@ -1342,10 +1342,10 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
|
|||
bool primary = inFlight.getInt("Primary");
|
||||
int highestPriority = inFlight.getInt("HighestPriority");
|
||||
|
||||
if (movingHighestPriority < PRIORITY_TEAM_REDUNDANT) {
|
||||
if (movingHighestPriority < SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT) {
|
||||
highestPriority = movingHighestPriority;
|
||||
} else if (partitionsInFlight > 0) {
|
||||
highestPriority = std::max<int>(highestPriority, PRIORITY_MERGE_SHARD);
|
||||
highestPriority = std::max<int>(highestPriority, SERVER_KNOBS->PRIORITY_MERGE_SHARD);
|
||||
}
|
||||
|
||||
JsonBuilderObject team_tracker;
|
||||
|
@ -1354,7 +1354,7 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
|
|||
team_tracker.setKeyRawNumber("unhealthy_servers",inFlight.getValue("UnhealthyServers"));
|
||||
|
||||
JsonBuilderObject stateSectionObj;
|
||||
if (highestPriority >= PRIORITY_TEAM_0_LEFT) {
|
||||
if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) {
|
||||
stateSectionObj["healthy"] = false;
|
||||
stateSectionObj["name"] = "missing_data";
|
||||
stateSectionObj["description"] = "No replicas remain of some data";
|
||||
|
@ -1363,7 +1363,7 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
|
|||
*minReplicasRemaining = 0;
|
||||
}
|
||||
}
|
||||
else if (highestPriority >= PRIORITY_TEAM_1_LEFT) {
|
||||
else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_1_LEFT) {
|
||||
stateSectionObj["healthy"] = false;
|
||||
stateSectionObj["name"] = "healing";
|
||||
stateSectionObj["description"] = "Only one replica remains of some data";
|
||||
|
@ -1372,7 +1372,7 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
|
|||
*minReplicasRemaining = 1;
|
||||
}
|
||||
}
|
||||
else if (highestPriority >= PRIORITY_TEAM_2_LEFT) {
|
||||
else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_2_LEFT) {
|
||||
stateSectionObj["healthy"] = false;
|
||||
stateSectionObj["name"] = "healing";
|
||||
stateSectionObj["description"] = "Only two replicas remain of some data";
|
||||
|
@ -1381,26 +1381,26 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
|
|||
*minReplicasRemaining = 2;
|
||||
}
|
||||
}
|
||||
else if (highestPriority >= PRIORITY_TEAM_UNHEALTHY) {
|
||||
else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY) {
|
||||
stateSectionObj["healthy"] = false;
|
||||
stateSectionObj["name"] = "healing";
|
||||
stateSectionObj["description"] = "Restoring replication factor";
|
||||
} else if (highestPriority >= PRIORITY_MERGE_SHARD) {
|
||||
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_MERGE_SHARD) {
|
||||
stateSectionObj["healthy"] = true;
|
||||
stateSectionObj["name"] = "healthy_repartitioning";
|
||||
stateSectionObj["description"] = "Repartitioning.";
|
||||
} else if (highestPriority >= PRIORITY_TEAM_REDUNDANT) {
|
||||
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT) {
|
||||
stateSectionObj["healthy"] = true;
|
||||
stateSectionObj["name"] = "optimizing_team_collections";
|
||||
stateSectionObj["description"] = "Optimizing team collections";
|
||||
} else if (highestPriority >= PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER) {
|
||||
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER) {
|
||||
stateSectionObj["healthy"] = true;
|
||||
stateSectionObj["name"] = "healthy_removing_server";
|
||||
stateSectionObj["description"] = "Removing storage server";
|
||||
} else if (highestPriority == PRIORITY_TEAM_HEALTHY) {
|
||||
} else if (highestPriority == SERVER_KNOBS->PRIORITY_TEAM_HEALTHY) {
|
||||
stateSectionObj["healthy"] = true;
|
||||
stateSectionObj["name"] = "healthy";
|
||||
} else if (highestPriority >= PRIORITY_REBALANCE_SHARD) {
|
||||
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_RECOVER_MOVE) {
|
||||
stateSectionObj["healthy"] = true;
|
||||
stateSectionObj["name"] = "healthy_rebalancing";
|
||||
stateSectionObj["description"] = "Rebalancing";
|
||||
|
|
|
@ -44,7 +44,7 @@ struct DDMetricsWorkload : TestWorkload {
|
|||
TraceEventFields md = wait( timeoutError(masterWorker.eventLogRequest.getReply(
|
||||
EventLogRequest( LiteralStringRef( "MovingData" ) ) ), 1.0 ) );
|
||||
int relocations;
|
||||
sscanf(md.getValue("HighPriorityRelocations").c_str(), "%d", &relocations);
|
||||
sscanf(md.getValue("UnhealthyRelocations").c_str(), "%d", &relocations);
|
||||
return relocations;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue