fix: shards affected by team failure did not properly handle separate teams for the remote and primary data centers

This commit is contained in:
Evan Tschannen 2018-03-08 10:50:05 -08:00
parent 9d4cdc828b
commit fa7eaea7cf
4 changed files with 168 additions and 99 deletions

View File

@ -327,7 +327,7 @@ ACTOR Future<Void> storageServerFailureTracker(
}
// Read keyservers, return unique set of teams
ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution( Database cx, UID masterId, MoveKeysLock moveKeysLock ) {
ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution( Database cx, UID masterId, MoveKeysLock moveKeysLock, std::vector<Optional<Key>> remoteDcIds ) {
state Reference<InitialDataDistribution> result = Reference<InitialDataDistribution>(new InitialDataDistribution);
state Key beginKey = allKeys.begin;
@ -335,8 +335,12 @@ ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution( Dat
state Transaction tr( cx );
state std::map<UID, Optional<Key>> server_dc;
state std::map<vector<UID>, std::pair<vector<UID>, vector<UID>>> team_cache;
//Get the server list in its own try/catch block since it modifies result. We don't want a subsequent failure causing entries to be duplicated
loop {
server_dc.clear();
succeeded = false;
try {
result->mode = 1;
@ -364,6 +368,7 @@ ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution( Dat
for( int i = 0; i < serverList.get().size(); i++ ) {
auto ssi = decodeServerListValue( serverList.get()[i].value );
result->allServers.push_back( std::make_pair(ssi, id_data[ssi.locality.processId()].processClass) );
server_dc[ssi.id()] = ssi.locality.dcId();
}
break;
@ -392,17 +397,56 @@ ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution( Dat
// for each range
for(int i = 0; i < keyServers.size() - 1; i++) {
KeyRangeRef keys( keyServers[i].key, keyServers[i+1].key );
ShardInfo info( keyServers[i].key );
decodeKeyServersValue( keyServers[i].value, src, dest );
std::pair<vector<UID>,vector<UID>> teams;
for(int j=0; j<src.size(); j++)
teams.first.push_back(src[j]);
for(int j=0; j<dest.size(); j++)
teams.second.push_back(dest[j]);
result->shards.push_back( keyRangeWith(keys, teams) );
result->teams.insert( teams.first );
if (dest.size())
result->teams.insert( teams.second );
if(remoteDcIds.size()) {
auto srcIter = team_cache.find(src);
if(srcIter == team_cache.end()) {
for(auto& id : src) {
auto& dc = server_dc[id];
if(std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) {
info.remoteSrc.push_back(id);
} else {
info.primarySrc.push_back(id);
}
}
result->primaryTeams.insert( info.primarySrc );
result->remoteTeams.insert( info.remoteSrc );
team_cache[src] = std::make_pair(info.primarySrc, info.remoteSrc);
} else {
info.primarySrc = srcIter->second.first;
info.remoteSrc = srcIter->second.second;
}
if(dest.size()) {
info.hasDest = true;
auto destIter = team_cache.find(dest);
if(destIter == team_cache.end()) {
for(auto& id : dest) {
auto& dc = server_dc[id];
if(std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) {
info.remoteDest.push_back(id);
} else {
info.primaryDest.push_back(id);
}
}
result->primaryTeams.insert( info.primaryDest );
result->remoteTeams.insert( info.remoteDest );
team_cache[dest] = std::make_pair(info.primaryDest, info.remoteDest);
} else {
info.primaryDest = destIter->second.first;
info.remoteDest = destIter->second.second;
}
}
} else {
info.primarySrc = src;
result->primaryTeams.insert( src );
if (dest.size()) {
info.hasDest = true;
info.primaryDest = dest;
result->primaryTeams.insert( dest );
}
}
result->shards.push_back( info );
}
ASSERT(keyServers.size() > 0);
@ -420,7 +464,7 @@ ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution( Dat
}
// a dummy shard at the end with no keys or servers makes life easier for trackInitialShards()
result->shards.push_back( keyRangeWith(KeyRangeRef(allKeys.end,allKeys.end), std::pair<vector<UID>, vector<UID>>()) );
result->shards.push_back( ShardInfo(allKeys.end) );
return result;
}
@ -480,6 +524,7 @@ struct DDTeamCollection {
std::vector<Optional<Key>> includedDCs;
Optional<std::vector<Optional<Key>>> otherTrackedDCs;
bool primary;
DDTeamCollection(
Database const& cx,
UID masterId,
@ -490,12 +535,12 @@ struct DDTeamCollection {
std::vector<Optional<Key>> includedDCs,
Optional<std::vector<Optional<Key>>> otherTrackedDCs,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& serverChanges,
Future<Void> readyToStart, Reference<AsyncVar<bool>> zeroHealthyTeams )
Future<Void> readyToStart, Reference<AsyncVar<bool>> zeroHealthyTeams, bool primary )
:cx(cx), masterId(masterId), lock(lock), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams( true ), teamBuilder( Void() ),
configuration(configuration), serverChanges(serverChanges),
initialFailureReactionDelay( delay( BUGGIFY ? 0 : SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskDataDistribution ) ), healthyTeamCount( 0 ),
initializationDoneActor(logOnCompletion(readyToStart && initialFailureReactionDelay, this)), optimalTeamCount( 0 ), recruitingStream(0), restartRecruiting( SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY ),
unhealthyServers(0), includedDCs(includedDCs), otherTrackedDCs(otherTrackedDCs), zeroHealthyTeams(zeroHealthyTeams), zeroOptimalTeams(true)
unhealthyServers(0), includedDCs(includedDCs), otherTrackedDCs(otherTrackedDCs), zeroHealthyTeams(zeroHealthyTeams), zeroOptimalTeams(true), primary(primary)
{
TraceEvent("DDTrackerStarting", masterId)
.detail( "State", "Inactive" )
@ -759,9 +804,15 @@ struct DDTeamCollection {
}
}
for(auto t = initTeams.teams.begin(); t != initTeams.teams.end(); ++t) {
if(primary) {
for(auto t = initTeams.primaryTeams.begin(); t != initTeams.primaryTeams.end(); ++t) {
addTeam(t->begin(), t->end() );
}
} else {
for(auto t = initTeams.remoteTeams.begin(); t != initTeams.remoteTeams.end(); ++t) {
addTeam(t->begin(), t->end() );
}
}
addSubsetOfEmergencyTeams();
}
@ -830,9 +881,6 @@ struct DDTeamCollection {
}
}
if(newTeamServers.empty()) {
return;
}
Reference<TCTeamInfo> teamInfo( new TCTeamInfo( newTeamServers ) );
TraceEvent("TeamCreation", masterId).detail("Team", teamInfo->getDesc());
teamInfo->tracker = teamTracker( this, teamInfo );
@ -1290,20 +1338,20 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
lastZeroHealthy = self->zeroHealthyTeams->get(); //set this again in case it changed from this teams health changing
if( self->initialFailureReactionDelay.isReady() && !self->zeroHealthyTeams->get() ) {
vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( team->getServerIDs() );
vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team(team->getServerIDs(), self->primary) );
for(int i=0; i<shards.size(); i++) {
int maxPriority = team->getPriority();
auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] );
for( int t=0; t<teams.size(); t++) {
ASSERT( teams[t].size() );
ASSERT( teams[t].servers.size() );
if( self->server_info.count( teams[t][0] ) ) {
auto& info = self->server_info[teams[t][0]];
if( self->server_info.count( teams[t].servers[0] ) ) {
auto& info = self->server_info[teams[t].servers[0]];
bool found = false;
for( int i = 0; i < info->teams.size(); i++ ) {
if( info->teams[i]->serverIDs == teams[t] ) {
if( info->teams[i]->serverIDs == teams[t].servers ) {
maxPriority = std::max( maxPriority, info->teams[i]->getPriority() );
found = true;
break;
@ -1826,9 +1874,10 @@ ACTOR Future<Void> dataDistributionTeamCollection(
Optional<std::vector<Optional<Key>>> otherTrackedDCs,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges,
Future<Void> readyToStart,
Reference<AsyncVar<bool>> zeroHealthyTeams )
Reference<AsyncVar<bool>> zeroHealthyTeams,
bool primary)
{
state DDTeamCollection self( cx, masterId, lock, output, shardsAffectedByTeamFailure, configuration, includedDCs, otherTrackedDCs, serverChanges, readyToStart, zeroHealthyTeams );
state DDTeamCollection self( cx, masterId, lock, output, shardsAffectedByTeamFailure, configuration, includedDCs, otherTrackedDCs, serverChanges, readyToStart, zeroHealthyTeams, primary );
state Future<Void> loggingTrigger = Void();
state PromiseStream<Void> serverRemoved;
state Future<Void> interfaceChanges;
@ -2113,9 +2162,9 @@ ACTOR Future<Void> dataDistribution(
TraceEvent("DDInitTakingMoveKeysLock", mi.id());
state MoveKeysLock lock = wait( takeMoveKeysLock( cx, mi.id() ) );
TraceEvent("DDInitTookMoveKeysLock", mi.id());
state Reference<InitialDataDistribution> initData = wait( getInitialDataDistribution(cx, mi.id(), lock) );
state Reference<InitialDataDistribution> initData = wait( getInitialDataDistribution(cx, mi.id(), lock, configuration.remoteTLogReplicationFactor > 0 ? remoteDcIds : std::vector<Optional<Key>>() ) );
if(initData->shards.size() > 1) {
TraceEvent("DDInitGotInitialDD", mi.id()).detail("b", printable(initData->shards.end()[-2].begin)).detail("e", printable(initData->shards.end()[-2].end)).detail("src", describe(initData->shards.end()[-2].value.first)).detail("dest", describe(initData->shards.end()[-2].value.second)).trackLatest("InitialDD");
TraceEvent("DDInitGotInitialDD", mi.id()).detail("b", printable(initData->shards.end()[-2].key)).detail("e", printable(initData->shards.end()[-1].key)).detail("src", describe(initData->shards.end()[-2].primarySrc)).detail("dest", describe(initData->shards.end()[-2].primaryDest)).trackLatest("InitialDD");
} else {
TraceEvent("DDInitGotInitialDD", mi.id()).detail("b","").detail("e", "").detail("src", "[no items]").detail("dest", "[no items]").trackLatest("InitialDD");
}
@ -2168,13 +2217,29 @@ ACTOR Future<Void> dataDistribution(
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure );
for(int s=0; s<initData->shards.size() - 1; s++) {
KeyRangeRef keys = KeyRangeRef(initData->shards[s].key, initData->shards[s+1].key);
shardsAffectedByTeamFailure->defineShard(keys);
std::vector<ShardsAffectedByTeamFailure::Team> teams;
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[s].primarySrc, true));
if(configuration.remoteTLogReplicationFactor > 0) {
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[s].remoteSrc, false));
}
shardsAffectedByTeamFailure->moveShard(keys, teams);
if(initData->shards[s].hasDest) {
// This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in
// DataDistributionQueue to track it, but it's easier to just (with low priority) schedule it for movement.
output.send( RelocateShard( keys, PRIORITY_RECOVER_MOVE ) );
}
}
actors.push_back( pollMoveKeysLock(cx, lock) );
actors.push_back( popOldTags( cx, logSystem, recoveryCommitVersion) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, shardsAffectedByTeamFailure, output, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, getShardMetrics, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, configuration.durableStorageQuorum, lastLimited ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, configuration.remoteTLogReplicationFactor > 0 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0] ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, configuration.remoteTLogReplicationFactor > 0 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
if (configuration.remoteTLogReplicationFactor > 0) {
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[1] ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[1], false ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
}
Void _ = wait( waitForAll( actors ) );
@ -2215,7 +2280,8 @@ DDTeamCollection* testTeamCollection(int teamSize, IRepPolicyRef policy, int pro
{},
PromiseStream<std::pair<UID, Optional<StorageServerInterface>>>(),
Future<Void>(Void()),
Reference<AsyncVar<bool>>( new AsyncVar<bool>(true) )
Reference<AsyncVar<bool>>( new AsyncVar<bool>(true) ),
true
);
for(int id = 1; id <= processCount; id++) {

View File

@ -120,7 +120,23 @@ struct TeamCollectionInterface {
class ShardsAffectedByTeamFailure : public ReferenceCounted<ShardsAffectedByTeamFailure> {
public:
ShardsAffectedByTeamFailure() {}
typedef vector<UID> Team; // sorted
struct Team {
vector<UID> servers; // sorted
bool primary;
Team() : primary(true) {}
Team(vector<UID> const& servers, bool primary) : servers(servers), primary(primary) {}
bool operator < ( const Team& r ) const {
if( servers == r.servers ) return primary < r.primary;
return servers < r.servers;
}
bool operator == ( const Team& r ) const {
return servers == r.servers && primary == r.primary;
}
};
// This tracks the data distribution on the data distribution server so that teamTrackers can
// relocate the right shards when a team is degraded.
@ -138,9 +154,9 @@ public:
int getNumberOfShards( UID ssID );
vector<KeyRange> getShardsFor( Team team );
vector<vector<UID>> getTeamsFor( KeyRangeRef keys );
vector<Team> getTeamsFor( KeyRangeRef keys );
void defineShard( KeyRangeRef keys );
void moveShard( KeyRangeRef keys, Team destinationTeam );
void moveShard( KeyRangeRef keys, std::vector<Team> destinationTeam );
void check();
private:
struct OrderByTeamKey {
@ -159,12 +175,23 @@ private:
void insert(Team team, KeyRange const& range);
};
struct ShardInfo {
Key key;
vector<UID> primarySrc;
vector<UID> remoteSrc;
vector<UID> primaryDest;
vector<UID> remoteDest;
bool hasDest;
explicit ShardInfo(Key key) : key(key), hasDest(false) {}
};
struct InitialDataDistribution : ReferenceCounted<InitialDataDistribution> {
typedef vector<UID> Team; // sorted
int mode;
vector<std::pair<StorageServerInterface, ProcessClass>> allServers;
std::set< Team > teams;
vector<KeyRangeWith<std::pair<Team, Team>>> shards;
std::set<vector<UID>> primaryTeams;
std::set<vector<UID>> remoteTeams;
vector<ShardInfo> shards;
};
Future<Void> dataDistribution(
@ -180,7 +207,6 @@ Future<Void> dataDistribution(
Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> const& initData,
Database const& cx,
Reference<ShardsAffectedByTeamFailure> const& shardsAffectedByTeamFailure,
PromiseStream<RelocateShard> const& output,
PromiseStream<GetMetricsRequest> const& getShardMetrics,
FutureStream<Promise<int64_t>> const& getAverageShardBytes,

View File

@ -837,6 +837,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
state bool signalledTransferComplete = false;
state UID masterId = self->mi.id();
state ParallelTCInfo destination;
state std::vector<ShardsAffectedByTeamFailure::Team> destinationTeams;
state ParallelTCInfo healthyDestinations;
state bool anyHealthy = false;
state int durableStorageQuorum = 0;
@ -865,6 +866,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
state int tciIndex = 0;
state bool foundTeams = true;
destination.clear();
destinationTeams.clear();
healthyDestinations.clear();
anyHealthy = false;
durableStorageQuorum = 0;
@ -883,6 +885,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
Optional<Reference<IDataDistributionTeam>> bestTeam = wait(brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req)));
if (bestTeam.present()) {
destination.addTeam(bestTeam.get());
destinationTeams.push_back(ShardsAffectedByTeamFailure::Team(bestTeam.get()->getServerIDs(), tciIndex == 0));
if(bestTeam.get()->isHealthy()) {
healthyDestinations.addTeam(bestTeam.get());
anyHealthy = true;
@ -912,7 +915,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
Void _ = wait( delay( SERVER_KNOBS->BEST_TEAM_STUCK_DELAY, TaskDataDistributionLaunch ) );
}
self->shardsAffectedByTeamFailure->moveShard(rd.keys, destination.getServerIDs());
self->shardsAffectedByTeamFailure->moveShard(rd.keys, destinationTeams);
//FIXME: do not add data in flight to servers that were already in the src.
destination.addDataInFlightToTeam(+metrics.bytes);
@ -1009,12 +1012,12 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
}
}
ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<IDataDistributionTeam> sourceTeam, Reference<IDataDistributionTeam> destTeam ) {
ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<IDataDistributionTeam> sourceTeam, Reference<IDataDistributionTeam> destTeam, bool primary ) {
if(g_network->isSimulated() && g_simulator.speedUpSimulation) {
return false;
}
std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( sourceTeam->getServerIDs() );
std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) );
if( !shards.size() )
return false;
@ -1028,7 +1031,7 @@ ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<ID
return false;
//verify the shard is still in sabtf
std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( sourceTeam->getServerIDs() );
std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) );
for( int i = 0; i < shards.size(); i++ ) {
if( moveShard == shards[i] ) {
TraceEvent(priority == PRIORITY_REBALANCE_OVERUTILIZED_TEAM ? "BgDDMountainChopper" : "BgDDValleyFiller", self->mi.id())
@ -1057,7 +1060,7 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
if( randomTeam.get()->getMinFreeSpaceRatio() > SERVER_KNOBS->FREE_SPACE_RATIO_DD_CUTOFF ) {
state Optional<Reference<IDataDistributionTeam>> loadedTeam = wait( brokenPromiseToNever( self->teamCollections[teamCollectionIndex].getTeam.getReply( GetTeamRequest( true, true, false ) ) ) );
if( loadedTeam.present() ) {
bool moved = wait( rebalanceTeams( self, PRIORITY_REBALANCE_OVERUTILIZED_TEAM, loadedTeam.get(), randomTeam.get() ) );
bool moved = wait( rebalanceTeams( self, PRIORITY_REBALANCE_OVERUTILIZED_TEAM, loadedTeam.get(), randomTeam.get(), teamCollectionIndex == 0 ) );
if(moved) {
resetCount = 0;
} else {
@ -1092,7 +1095,7 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
state Optional<Reference<IDataDistributionTeam>> unloadedTeam = wait( brokenPromiseToNever( self->teamCollections[teamCollectionIndex].getTeam.getReply( GetTeamRequest( true, true, true ) ) ) );
if( unloadedTeam.present() ) {
if( unloadedTeam.get()->getMinFreeSpaceRatio() > SERVER_KNOBS->FREE_SPACE_RATIO_DD_CUTOFF ) {
bool moved = wait( rebalanceTeams( self, PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, randomTeam.get(), unloadedTeam.get() ) );
bool moved = wait( rebalanceTeams( self, PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, randomTeam.get(), unloadedTeam.get(), teamCollectionIndex == 0 ) );
if(moved) {
resetCount = 0;
} else {

View File

@ -598,9 +598,7 @@ void restartShardTrackers( DataDistributionTracker* self, KeyRangeRef keys, Opti
}
}
ACTOR Future<Void> trackInitialShards(DataDistributionTracker *self,
Reference<InitialDataDistribution> initData,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure)
ACTOR Future<Void> trackInitialShards(DataDistributionTracker *self, Reference<InitialDataDistribution> initData)
{
TraceEvent("TrackInitialShards", self->masterId).detail("InitialShardCount", initData->shards.size());
@ -608,35 +606,9 @@ ACTOR Future<Void> trackInitialShards(DataDistributionTracker *self,
//SOMEDAY: Figure out what this priority should actually be
Void _ = wait( delay( 0.0, TaskDataDistribution ) );
state int lastBegin = -1;
state vector<UID> last;
state int s;
for(s=0; s<initData->shards.size(); s++) {
state InitialDataDistribution::Team src = initData->shards[s].value.first;
auto& dest = initData->shards[s].value.second;
if (dest.size()) {
// This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in
// DataDistributionQueue to track it, but it's easier to just (with low priority) schedule it for movement.
self->output.send( RelocateShard( initData->shards[s], PRIORITY_RECOVER_MOVE ) );
}
// The following clause was here for no remembered reason. It was removed, however, because on resumption of stopped
// clusters (of size 3) it was grouping all the the shards in the system into one, and then splitting them all back out,
// causing unecessary data distribution.
//if (s==0 || s+1==initData.shards.size() || lastBegin<0 || src != last || initData.shards[s].begin == keyServersPrefix) {
// end current run, start a new shardTracker
// relies on the dummy shard at allkeysend
if (lastBegin >= 0) {
state KeyRangeRef keys( initData->shards[lastBegin].begin, initData->shards[s].begin );
restartShardTrackers( self, keys );
shardsAffectedByTeamFailure->defineShard( keys );
shardsAffectedByTeamFailure->moveShard( keys, last );
}
lastBegin = s;
last = src;
//}
for(s=0; s<initData->shards.size()-1; s++) {
restartShardTrackers( self, KeyRangeRef( initData->shards[s].key, initData->shards[s+1].key ) );
Void _ = wait( yield( TaskDataDistribution ) );
}
@ -692,7 +664,6 @@ ACTOR Future<Void> fetchShardMetrics( DataDistributionTracker* self, GetMetricsR
ACTOR Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> initData,
Database cx,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<RelocateShard> output,
PromiseStream<GetMetricsRequest> getShardMetrics,
FutureStream<Promise<int64_t>> getAverageShardBytes,
@ -703,7 +674,7 @@ ACTOR Future<Void> dataDistributionTracker(
state DataDistributionTracker self(cx, masterId, readyToStart, output, anyZeroHealthyTeams);
state Future<Void> loggingTrigger = Void();
try {
Void _ = wait( trackInitialShards( &self, initData, shardsAffectedByTeamFailure ) );
Void _ = wait( trackInitialShards( &self, initData ) );
initData = Reference<InitialDataDistribution>();
loop choose {
@ -731,9 +702,7 @@ ACTOR Future<Void> dataDistributionTracker(
vector<KeyRange> ShardsAffectedByTeamFailure::getShardsFor( Team team ) {
vector<KeyRange> r;
for(auto it = team_shards.lower_bound( std::pair<Team,KeyRange>( team, KeyRangeRef() ) );
it != team_shards.end() && it->first == team;
++it)
for(auto it = team_shards.lower_bound( std::pair<Team,KeyRange>( team, KeyRangeRef() ) ); it != team_shards.end() && it->first == team; ++it)
r.push_back( it->second );
return r;
}
@ -742,20 +711,20 @@ int ShardsAffectedByTeamFailure::getNumberOfShards( UID ssID ) {
return storageServerShards[ssID];
}
vector<vector<UID>> ShardsAffectedByTeamFailure::getTeamsFor( KeyRangeRef keys ) {
vector<ShardsAffectedByTeamFailure::Team> ShardsAffectedByTeamFailure::getTeamsFor( KeyRangeRef keys ) {
return shard_teams[keys.begin];
}
void ShardsAffectedByTeamFailure::erase(Team team, KeyRange const& range) {
if(team_shards.erase( std::pair<Team,KeyRange>(team, range) ) > 0) {
for(auto uid = team.begin(); uid != team.end(); ++uid)
for(auto uid = team.servers.begin(); uid != team.servers.end(); ++uid)
storageServerShards[*uid]--;
}
}
void ShardsAffectedByTeamFailure::insert(Team team, KeyRange const& range) {
if(team_shards.insert( std::pair<Team,KeyRange>( team, range ) ).second) {
for(auto uid = team.begin(); uid != team.end(); ++uid)
for(auto uid = team.servers.begin(); uid != team.servers.end(); ++uid)
storageServerShards[*uid]++;
}
}
@ -787,7 +756,7 @@ void ShardsAffectedByTeamFailure::defineShard( KeyRangeRef keys ) {
check();
}
void ShardsAffectedByTeamFailure::moveShard( KeyRangeRef keys, Team destinationTeam ) {
void ShardsAffectedByTeamFailure::moveShard( KeyRangeRef keys, std::vector<Team> destinationTeams ) {
/*TraceEvent("ShardsAffectedByTeamFailureMove")
.detail("KeyBegin", printable(keys.begin))
.detail("KeyEnd", printable(keys.end))
@ -795,31 +764,36 @@ void ShardsAffectedByTeamFailure::moveShard( KeyRangeRef keys, Team destinationT
.detail("NewTeam", describe(destinationTeam));*/
auto ranges = shard_teams.intersectingRanges( keys );
std::vector< std::pair<Team,KeyRange> > modifiedShards;
std::vector< std::pair<std::vector<Team>,KeyRange> > modifiedShards;
for(auto it = ranges.begin(); it != ranges.end(); ++it) {
if( keys.contains( it->range() ) ) {
// erase the many teams that were assiciated with this one shard
// erase the many teams that were associated with this one shard
for(auto t = it->value().begin(); t != it->value().end(); ++t) {
erase(*t, it->range());
}
// save this modification for later insertion
modifiedShards.push_back( std::pair<Team,KeyRange>( destinationTeam, it->range() ) );
modifiedShards.push_back( std::pair<std::vector<Team>,KeyRange>( destinationTeams, it->range() ) );
} else {
// for each range that touches this move, add our team as affecting this range
insert(destinationTeam, it->range());
for(auto& team : destinationTeams) {
insert(team, it->range());
// if we are not in the list of teams associated with this shard, add us in
auto& teams = it->value();
if( std::find( teams.begin(), teams.end(), destinationTeam ) == teams.end() )
teams.push_back( destinationTeam );
if( std::find( teams.begin(), teams.end(), team ) == teams.end() ) {
teams.push_back( team );
}
}
}
}
// we cannot modify the KeyRangeMap while iterating through it, so add saved modifications now
for( int i = 0; i < modifiedShards.size(); i++ ) {
insert(modifiedShards[i].first, modifiedShards[i].second);
shard_teams.insert( modifiedShards[i].second, vector<Team>( 1, modifiedShards[i].first ) );
for( auto& t : modifiedShards[i].first) {
insert(t, modifiedShards[i].second);
}
shard_teams.insert( modifiedShards[i].second, modifiedShards[i].first );
}
check();
@ -838,11 +812,11 @@ void ShardsAffectedByTeamFailure::check() {
auto rs = shard_teams.ranges();
for(auto i = rs.begin(); i != rs.end(); ++i)
for(vector<Team>::iterator t = i->value().begin(); t != i->value().end(); ++t)
if (!team_shards.count( make_pair( *t, i->range() ) )) {
if (!team_shards.count( std::make_pair( *t, i->range() ) )) {
std::string teamDesc, shards;
for(int k=0; k<t->size(); k++)
teamDesc += format("%llx ", (*t)[k].first());
for(auto x = team_shards.lower_bound( make_pair( *t, KeyRangeRef() ) ); x != team_shards.end() && x->first == *t; ++x)
for(int k=0; k<t->servers.size(); k++)
teamDesc += format("%llx ", t->servers[k].first());
for(auto x = team_shards.lower_bound( std::make_pair( *t, KeyRangeRef() ) ); x != team_shards.end() && x->first == *t; ++x)
shards += printable(x->second.begin) + "-" + printable(x->second.end) + ",";
TraceEvent(SevError,"SATFInvariantError2")
.detail("KB", printable(i->begin()))