fix: if a destination team became unhealthy and then healthy again, it would lower the priority of a move even though the source servers we are moving from are still unhealthy

fix: badTeams were not accounted for when checking priorities
This commit is contained in:
Evan Tschannen 2018-11-11 12:33:31 -08:00
parent ebac7008f8
commit cd188a351e
4 changed files with 92 additions and 78 deletions

View File

@ -1466,57 +1466,52 @@ ACTOR Future<Void> teamTracker( DDTeamCollection* self, Reference<TCTeamInfo> te
for(int i=0; i<shards.size(); i++) { for(int i=0; i<shards.size(); i++) {
int maxPriority = team->getPriority(); int maxPriority = team->getPriority();
int maxOtherPriority = maxPriority;
if(maxPriority < PRIORITY_TEAM_0_LEFT) { if(maxPriority < PRIORITY_TEAM_0_LEFT) {
auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] ); auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] );
for( int t=0; t<teams.size(); t++) { for( int j=0; j < teams.first.size()+teams.second.size(); j++) {
if( teams[t].servers.size() ) { auto& t = j < teams.first.size() ? teams.first[j] : teams.second[j-teams.first.size()];
bool foundServer = false; if( !t.servers.size() ) {
for(auto tc : self->teamCollections) { maxPriority = PRIORITY_TEAM_0_LEFT;
if( tc->server_info.count( teams[t].servers[0] ) ) { break;
ASSERT(!foundServer); }
foundServer = true;
auto& info = tc->server_info[teams[t].servers[0]];
bool found = false; auto tc = self->teamCollections[t.primary ? 0 : 1];
for( int i = 0; i < info->teams.size(); i++ ) { ASSERT(tc->primary == t.primary);
if( info->teams[i]->serverIDs == teams[t].servers ) { if( tc->server_info.count( t.servers[0] ) ) {
if(teams[t].primary == self->primary) { auto& info = tc->server_info[t.servers[0]];
maxPriority = std::max( maxPriority, info->teams[i]->getPriority() );
} else {
maxOtherPriority = std::max( maxOtherPriority, info->teams[i]->getPriority() );
}
found = true;
break;
}
}
TEST(!found); // A removed team is still associated with a shard in SABTF bool found = false;
for( int k = 0; k < info->teams.size(); k++ ) {
if( info->teams[k]->serverIDs == t.servers ) {
maxPriority = std::max( maxPriority, info->teams[k]->getPriority() );
found = true;
break;
} }
} }
TEST(!foundServer); // A removed server is still associated with a team in SABTF
//If we cannot find the team, it could be a bad team so assume unhealthy priority
if(!found) {
maxPriority = std::max<int>( maxPriority, PRIORITY_TEAM_UNHEALTHY );
}
} else {
TEST(true); // A removed server is still associated with a team in SABTF
} }
} }
} }
if( maxPriority == team->getPriority() || lastPriority > maxPriority ) { RelocateShard rs;
RelocateShard rs; rs.keys = shards[i];
rs.keys = shards[i]; rs.priority = maxPriority;
rs.priority = std::max(maxPriority,maxOtherPriority);
self->output.send(rs); self->output.send(rs);
if(g_random->random01() < 0.01) { if(g_random->random01() < 0.01) {
TraceEvent("SendRelocateToDDQx100", self->masterId) TraceEvent("SendRelocateToDDQx100", self->masterId)
.detail("Team", team->getDesc()) .detail("Team", team->getDesc())
.detail("KeyBegin", printable(rs.keys.begin)) .detail("KeyBegin", printable(rs.keys.begin))
.detail("KeyEnd", printable(rs.keys.end)) .detail("KeyEnd", printable(rs.keys.end))
.detail("Priority", rs.priority) .detail("Priority", rs.priority)
.detail("TeamFailedMachines", team->getServerIDs().size()-serversLeft) .detail("TeamFailedMachines", team->getServerIDs().size()-serversLeft)
.detail("TeamOKMachines", serversLeft); .detail("TeamOKMachines", serversLeft);
}
} else {
TraceEvent("RelocationNotSentToDDQ", self->masterId)
.detail("Team", team->getDesc());
} }
} }
} else { } else {
@ -1706,7 +1701,7 @@ ACTOR Future<Void> removeBadTeams(DDTeamCollection* self) {
} }
} }
Void _ = wait(self->addSubsetComplete.getFuture()); Void _ = wait(self->addSubsetComplete.getFuture());
TraceEvent("DDRemovingBadTeams", self->masterId); TraceEvent("DDRemovingBadTeams", self->masterId).detail("Primary", self->primary);
for(auto it : self->badTeams) { for(auto it : self->badTeams) {
it->tracker.cancel(); it->tracker.cancel();
} }
@ -2145,7 +2140,6 @@ ACTOR Future<Void> serverGetTeamRequests(TeamCollectionInterface tci, DDTeamColl
// Keep track of servers and teams -- serves requests for getRandomTeam // Keep track of servers and teams -- serves requests for getRandomTeam
ACTOR Future<Void> dataDistributionTeamCollection( ACTOR Future<Void> dataDistributionTeamCollection(
Reference<DDTeamCollection> teamCollection, Reference<DDTeamCollection> teamCollection,
std::vector<DDTeamCollection*> teamCollections,
Reference<InitialDataDistribution> initData, Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci, TeamCollectionInterface tci,
Reference<AsyncVar<struct ServerDBInfo>> db) Reference<AsyncVar<struct ServerDBInfo>> db)
@ -2155,8 +2149,6 @@ ACTOR Future<Void> dataDistributionTeamCollection(
state PromiseStream<Void> serverRemoved; state PromiseStream<Void> serverRemoved;
state Future<Void> error = actorCollection( self->addActor.getFuture() ); state Future<Void> error = actorCollection( self->addActor.getFuture() );
self->teamCollections = teamCollections;
try { try {
Void _ = wait( DDTeamCollection::init( self, initData ) ); Void _ = wait( DDTeamCollection::init( self, initData ) );
initData = Reference<InitialDataDistribution>(); initData = Reference<InitialDataDistribution>();
@ -2415,6 +2407,11 @@ ACTOR Future<Void> dataDistribution(
if(configuration.usableRegions > 1) { if(configuration.usableRegions > 1) {
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].remoteSrc, false)); teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].remoteSrc, false));
} }
if(g_network->isSimulated()) {
TraceEvent("DDInitShard").detail("Keys", printable(keys)).detail("PrimarySrc", describe(initData->shards[shard].primarySrc)).detail("RemoteSrc", describe(initData->shards[shard].remoteSrc))
.detail("PrimaryDest", describe(initData->shards[shard].primaryDest)).detail("RemoteDest", describe(initData->shards[shard].remoteDest));
}
shardsAffectedByTeamFailure->moveShard(keys, teams); shardsAffectedByTeamFailure->moveShard(keys, teams);
if(initData->shards[shard].hasDest) { if(initData->shards[shard].hasDest) {
// This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in // This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in
@ -2452,21 +2449,17 @@ ACTOR Future<Void> dataDistribution(
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) ); actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) ); actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
vector<Reference<DDTeamCollection>> teamCollections;
teamCollections.push_back( Reference<DDTeamCollection>( new DDTeamCollection(cx, mi.id(), lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) ) );
if (configuration.usableRegions > 1) {
teamCollections.push_back( Reference<DDTeamCollection>( new DDTeamCollection(cx, mi.id(), lock, output, shardsAffectedByTeamFailure, configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), serverChanges, readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy) ) );
}
vector<DDTeamCollection*> teamCollectionsPtrs; vector<DDTeamCollection*> teamCollectionsPtrs;
for(auto it : teamCollections) { Reference<DDTeamCollection> primaryTeamCollection( new DDTeamCollection(cx, mi.id(), lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) );
teamCollectionsPtrs.push_back(it.getPtr()); teamCollectionsPtrs.push_back(primaryTeamCollection.getPtr());
}
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( teamCollections[0], teamCollectionsPtrs, initData, tcis[0], db ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
if (configuration.usableRegions > 1) { if (configuration.usableRegions > 1) {
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( teamCollections[1], teamCollectionsPtrs, initData, tcis[1], db ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) ); Reference<DDTeamCollection> remoteTeamCollection( new DDTeamCollection(cx, mi.id(), lock, output, shardsAffectedByTeamFailure, configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), serverChanges, readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy) );
teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr());
remoteTeamCollection->teamCollections = teamCollectionsPtrs;
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( remoteTeamCollection, initData, tcis[1], db ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
} }
primaryTeamCollection->teamCollections = teamCollectionsPtrs;
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( primaryTeamCollection, initData, tcis[0], db ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
actors.push_back(yieldPromiseStream(output.getFuture(), input)); actors.push_back(yieldPromiseStream(output.getFuture(), input));
Void _ = wait( waitForAll( actors ) ); Void _ = wait( waitForAll( actors ) );

View File

@ -154,9 +154,10 @@ public:
int getNumberOfShards( UID ssID ); int getNumberOfShards( UID ssID );
vector<KeyRange> getShardsFor( Team team ); vector<KeyRange> getShardsFor( Team team );
vector<Team> getTeamsFor( KeyRangeRef keys ); std::pair<vector<Team>,vector<Team>> getTeamsFor( KeyRangeRef keys );
void defineShard( KeyRangeRef keys ); void defineShard( KeyRangeRef keys );
void moveShard( KeyRangeRef keys, std::vector<Team> destinationTeam ); void moveShard( KeyRangeRef keys, std::vector<Team> destinationTeam );
void finishMove( KeyRangeRef keys );
void check(); void check();
private: private:
struct OrderByTeamKey { struct OrderByTeamKey {
@ -167,7 +168,7 @@ private:
} }
}; };
KeyRangeMap< vector<Team> > shard_teams; // A shard can be affected by the failure of multiple teams if it is a queued merge KeyRangeMap< std::pair<vector<Team>,vector<Team>> > shard_teams; // A shard can be affected by the failure of multiple teams if it is a queued merge
std::set< std::pair<Team,KeyRange>, OrderByTeamKey > team_shards; std::set< std::pair<Team,KeyRange>, OrderByTeamKey > team_shards;
std::map< UID, int > storageServerShards; std::map< UID, int > storageServerShards;

View File

@ -1030,6 +1030,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
} }
self->bytesWritten += metrics.bytes; self->bytesWritten += metrics.bytes;
self->shardsAffectedByTeamFailure->finishMove(rd.keys);
relocationComplete.send( rd ); relocationComplete.send( rd );
return Void(); return Void();
} else { } else {

View File

@ -701,7 +701,7 @@ int ShardsAffectedByTeamFailure::getNumberOfShards( UID ssID ) {
return storageServerShards[ssID]; return storageServerShards[ssID];
} }
vector<ShardsAffectedByTeamFailure::Team> ShardsAffectedByTeamFailure::getTeamsFor( KeyRangeRef keys ) { std::pair<vector<ShardsAffectedByTeamFailure::Team>,vector<ShardsAffectedByTeamFailure::Team>> ShardsAffectedByTeamFailure::getTeamsFor( KeyRangeRef keys ) {
return shard_teams[keys.begin]; return shard_teams[keys.begin];
} }
@ -720,14 +720,20 @@ void ShardsAffectedByTeamFailure::insert(Team team, KeyRange const& range) {
} }
void ShardsAffectedByTeamFailure::defineShard( KeyRangeRef keys ) { void ShardsAffectedByTeamFailure::defineShard( KeyRangeRef keys ) {
std::set<Team> teams; std::vector<Team> teams;
std::vector<Team> prevTeams;
auto rs = shard_teams.intersectingRanges(keys); auto rs = shard_teams.intersectingRanges(keys);
for(auto it = rs.begin(); it != rs.end(); ++it) { for(auto it = rs.begin(); it != rs.end(); ++it) {
for(auto t=it->value().begin(); t!=it->value().end(); ++t) { for(auto t=it->value().first.begin(); t!=it->value().first.end(); ++t) {
teams.insert( *t ); teams.push_back( *t );
erase(*t, it->range()); erase(*t, it->range());
} }
for(auto t=it->value().second.begin(); t!=it->value().second.end(); ++t) {
prevTeams.push_back( *t );
}
} }
uniquify(teams);
uniquify(prevTeams);
/*TraceEvent("ShardsAffectedByTeamFailureDefine") /*TraceEvent("ShardsAffectedByTeamFailureDefine")
.detail("KeyBegin", printable(keys.begin)) .detail("KeyBegin", printable(keys.begin))
@ -735,12 +741,12 @@ void ShardsAffectedByTeamFailure::defineShard( KeyRangeRef keys ) {
.detail("TeamCount", teams.size()); */ .detail("TeamCount", teams.size()); */
auto affectedRanges = shard_teams.getAffectedRangesAfterInsertion(keys); auto affectedRanges = shard_teams.getAffectedRangesAfterInsertion(keys);
shard_teams.insert( keys, vector<Team>(teams.begin(), teams.end()) ); shard_teams.insert( keys, std::make_pair(teams, prevTeams) );
for(auto r=affectedRanges.begin(); r != affectedRanges.end(); ++r) { for(auto r=affectedRanges.begin(); r != affectedRanges.end(); ++r) {
auto& teams = shard_teams[r->begin]; auto& t = shard_teams[r->begin];
for(auto t=teams.begin(); t!=teams.end(); ++t) { for(auto it=t.first.begin(); it!=t.first.end(); ++it) {
insert(*t, *r); insert(*it, *r);
} }
} }
check(); check();
@ -754,33 +760,39 @@ void ShardsAffectedByTeamFailure::moveShard( KeyRangeRef keys, std::vector<Team>
.detail("NewTeam", describe(destinationTeam));*/ .detail("NewTeam", describe(destinationTeam));*/
auto ranges = shard_teams.intersectingRanges( keys ); auto ranges = shard_teams.intersectingRanges( keys );
std::vector< std::pair<std::vector<Team>,KeyRange> > modifiedShards; std::vector< std::pair<std::pair<std::vector<Team>,std::vector<Team>>,KeyRange> > modifiedShards;
for(auto it = ranges.begin(); it != ranges.end(); ++it) { for(auto it = ranges.begin(); it != ranges.end(); ++it) {
if( keys.contains( it->range() ) ) { if( keys.contains( it->range() ) ) {
// erase the many teams that were associated with this one shard // erase the many teams that were associated with this one shard
for(auto t = it->value().begin(); t != it->value().end(); ++t) { for(auto t = it->value().first.begin(); t != it->value().first.end(); ++t) {
erase(*t, it->range()); erase(*t, it->range());
} }
// save this modification for later insertion // save this modification for later insertion
modifiedShards.push_back( std::pair<std::vector<Team>,KeyRange>( destinationTeams, it->range() ) ); std::vector<Team> prevTeams = it->value().second;
prevTeams.insert(prevTeams.end(), it->value().first.begin(), it->value().first.end());
uniquify(prevTeams);
modifiedShards.push_back( std::pair<std::pair<std::vector<Team>,std::vector<Team>>,KeyRange>( std::make_pair(destinationTeams, prevTeams), it->range() ) );
} else { } else {
// for each range that touches this move, add our team as affecting this range // for each range that touches this move, add our team as affecting this range
for(auto& team : destinationTeams) { for(auto& team : destinationTeams) {
insert(team, it->range()); insert(team, it->range());
// if we are not in the list of teams associated with this shard, add us in
auto& teams = it->value();
if( std::find( teams.begin(), teams.end(), team ) == teams.end() ) {
teams.push_back( team );
}
} }
// if we are not in the list of teams associated with this shard, add us in
auto& teams = it->value();
teams.second.insert(teams.second.end(), teams.first.begin(), teams.first.end());
uniquify(teams.second);
teams.first.insert(teams.first.end(), destinationTeams.begin(), destinationTeams.end());
uniquify(teams.first);
} }
} }
// we cannot modify the KeyRangeMap while iterating through it, so add saved modifications now // we cannot modify the KeyRangeMap while iterating through it, so add saved modifications now
for( int i = 0; i < modifiedShards.size(); i++ ) { for( int i = 0; i < modifiedShards.size(); i++ ) {
for( auto& t : modifiedShards[i].first) { for( auto& t : modifiedShards[i].first.first) {
insert(t, modifiedShards[i].second); insert(t, modifiedShards[i].second);
} }
shard_teams.insert( modifiedShards[i].second, modifiedShards[i].first ); shard_teams.insert( modifiedShards[i].second, modifiedShards[i].first );
@ -789,19 +801,26 @@ void ShardsAffectedByTeamFailure::moveShard( KeyRangeRef keys, std::vector<Team>
check(); check();
} }
void ShardsAffectedByTeamFailure::finishMove( KeyRangeRef keys ) {
auto ranges = shard_teams.containedRanges(keys);
for(auto it = ranges.begin(); it != ranges.end(); ++it) {
it.value().second.clear();
}
}
void ShardsAffectedByTeamFailure::check() { void ShardsAffectedByTeamFailure::check() {
if (EXPENSIVE_VALIDATION) { if (EXPENSIVE_VALIDATION) {
for(auto t = team_shards.begin(); t != team_shards.end(); ++t) { for(auto t = team_shards.begin(); t != team_shards.end(); ++t) {
auto i = shard_teams.rangeContaining(t->second.begin); auto i = shard_teams.rangeContaining(t->second.begin);
if (i->range() != t->second || if (i->range() != t->second ||
!std::count(i->value().begin(), i->value().end(), t->first)) !std::count(i->value().first.begin(), i->value().first.end(), t->first))
{ {
ASSERT(false); ASSERT(false);
} }
} }
auto rs = shard_teams.ranges(); auto rs = shard_teams.ranges();
for(auto i = rs.begin(); i != rs.end(); ++i) for(auto i = rs.begin(); i != rs.end(); ++i)
for(vector<Team>::iterator t = i->value().begin(); t != i->value().end(); ++t) for(vector<Team>::iterator t = i->value().first.begin(); t != i->value().first.end(); ++t)
if (!team_shards.count( std::make_pair( *t, i->range() ) )) { if (!team_shards.count( std::make_pair( *t, i->range() ) )) {
std::string teamDesc, shards; std::string teamDesc, shards;
for(int k=0; k<t->servers.size(); k++) for(int k=0; k<t->servers.size(); k++)