From fa7eaea7cf02bc710140bc27c364650877d9cb04 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Thu, 8 Mar 2018 10:50:05 -0800 Subject: [PATCH] fix: shards affected by team failure did not properly handle separate teams for the remote and primary data centers --- fdbserver/DataDistribution.actor.cpp | 130 +++++++++++++++----- fdbserver/DataDistribution.h | 40 ++++-- fdbserver/DataDistributionQueue.actor.cpp | 15 ++- fdbserver/DataDistributionTracker.actor.cpp | 82 +++++------- 4 files changed, 168 insertions(+), 99 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 007bf23467..5ceb15e167 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -327,7 +327,7 @@ ACTOR Future storageServerFailureTracker( } // Read keyservers, return unique set of teams -ACTOR Future> getInitialDataDistribution( Database cx, UID masterId, MoveKeysLock moveKeysLock ) { +ACTOR Future> getInitialDataDistribution( Database cx, UID masterId, MoveKeysLock moveKeysLock, std::vector> remoteDcIds ) { state Reference result = Reference(new InitialDataDistribution); state Key beginKey = allKeys.begin; @@ -335,8 +335,12 @@ ACTOR Future> getInitialDataDistribution( Dat state Transaction tr( cx ); + state std::map> server_dc; + state std::map, std::pair, vector>> team_cache; + //Get the server list in its own try/catch block since it modifies result. We don't want a subsequent failure causing entries to be duplicated loop { + server_dc.clear(); succeeded = false; try { result->mode = 1; @@ -364,6 +368,7 @@ ACTOR Future> getInitialDataDistribution( Dat for( int i = 0; i < serverList.get().size(); i++ ) { auto ssi = decodeServerListValue( serverList.get()[i].value ); result->allServers.push_back( std::make_pair(ssi, id_data[ssi.locality.processId()].processClass) ); + server_dc[ssi.id()] = ssi.locality.dcId(); } break; @@ -392,17 +397,56 @@ ACTOR Future> getInitialDataDistribution( Dat // for each range for(int i = 0; i < keyServers.size() - 1; i++) { - KeyRangeRef keys( keyServers[i].key, keyServers[i+1].key ); + ShardInfo info( keyServers[i].key ); decodeKeyServersValue( keyServers[i].value, src, dest ); - std::pair,vector> teams; - for(int j=0; jshards.push_back( keyRangeWith(keys, teams) ); - result->teams.insert( teams.first ); - if (dest.size()) - result->teams.insert( teams.second ); + if(remoteDcIds.size()) { + auto srcIter = team_cache.find(src); + if(srcIter == team_cache.end()) { + for(auto& id : src) { + auto& dc = server_dc[id]; + if(std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) { + info.remoteSrc.push_back(id); + } else { + info.primarySrc.push_back(id); + } + } + result->primaryTeams.insert( info.primarySrc ); + result->remoteTeams.insert( info.remoteSrc ); + team_cache[src] = std::make_pair(info.primarySrc, info.remoteSrc); + } else { + info.primarySrc = srcIter->second.first; + info.remoteSrc = srcIter->second.second; + } + if(dest.size()) { + info.hasDest = true; + auto destIter = team_cache.find(dest); + if(destIter == team_cache.end()) { + for(auto& id : dest) { + auto& dc = server_dc[id]; + if(std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) { + info.remoteDest.push_back(id); + } else { + info.primaryDest.push_back(id); + } + } + result->primaryTeams.insert( info.primaryDest ); + result->remoteTeams.insert( info.remoteDest ); + team_cache[dest] = std::make_pair(info.primaryDest, info.remoteDest); + } else { + info.primaryDest = destIter->second.first; + info.remoteDest = destIter->second.second; + } + } + } else { + info.primarySrc = src; + result->primaryTeams.insert( src ); + if (dest.size()) { + info.hasDest = true; + info.primaryDest = dest; + result->primaryTeams.insert( dest ); + } + } + result->shards.push_back( info ); } ASSERT(keyServers.size() > 0); @@ -420,7 +464,7 @@ ACTOR Future> getInitialDataDistribution( Dat } // a dummy shard at the end with no keys or servers makes life easier for trackInitialShards() - result->shards.push_back( keyRangeWith(KeyRangeRef(allKeys.end,allKeys.end), std::pair, vector>()) ); + result->shards.push_back( ShardInfo(allKeys.end) ); return result; } @@ -480,6 +524,7 @@ struct DDTeamCollection { std::vector> includedDCs; Optional>> otherTrackedDCs; + bool primary; DDTeamCollection( Database const& cx, UID masterId, @@ -490,12 +535,12 @@ struct DDTeamCollection { std::vector> includedDCs, Optional>> otherTrackedDCs, PromiseStream< std::pair> > const& serverChanges, - Future readyToStart, Reference> zeroHealthyTeams ) + Future readyToStart, Reference> zeroHealthyTeams, bool primary ) :cx(cx), masterId(masterId), lock(lock), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams( true ), teamBuilder( Void() ), configuration(configuration), serverChanges(serverChanges), initialFailureReactionDelay( delay( BUGGIFY ? 0 : SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskDataDistribution ) ), healthyTeamCount( 0 ), initializationDoneActor(logOnCompletion(readyToStart && initialFailureReactionDelay, this)), optimalTeamCount( 0 ), recruitingStream(0), restartRecruiting( SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY ), - unhealthyServers(0), includedDCs(includedDCs), otherTrackedDCs(otherTrackedDCs), zeroHealthyTeams(zeroHealthyTeams), zeroOptimalTeams(true) + unhealthyServers(0), includedDCs(includedDCs), otherTrackedDCs(otherTrackedDCs), zeroHealthyTeams(zeroHealthyTeams), zeroOptimalTeams(true), primary(primary) { TraceEvent("DDTrackerStarting", masterId) .detail( "State", "Inactive" ) @@ -759,8 +804,14 @@ struct DDTeamCollection { } } - for(auto t = initTeams.teams.begin(); t != initTeams.teams.end(); ++t) { - addTeam(t->begin(), t->end() ); + if(primary) { + for(auto t = initTeams.primaryTeams.begin(); t != initTeams.primaryTeams.end(); ++t) { + addTeam(t->begin(), t->end() ); + } + } else { + for(auto t = initTeams.remoteTeams.begin(); t != initTeams.remoteTeams.end(); ++t) { + addTeam(t->begin(), t->end() ); + } } addSubsetOfEmergencyTeams(); @@ -830,9 +881,6 @@ struct DDTeamCollection { } } - if(newTeamServers.empty()) { - return; - } Reference teamInfo( new TCTeamInfo( newTeamServers ) ); TraceEvent("TeamCreation", masterId).detail("Team", teamInfo->getDesc()); teamInfo->tracker = teamTracker( this, teamInfo ); @@ -1290,20 +1338,20 @@ ACTOR Future teamTracker( DDTeamCollection *self, ReferencezeroHealthyTeams->get(); //set this again in case it changed from this teams health changing if( self->initialFailureReactionDelay.isReady() && !self->zeroHealthyTeams->get() ) { - vector shards = self->shardsAffectedByTeamFailure->getShardsFor( team->getServerIDs() ); + vector shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team(team->getServerIDs(), self->primary) ); for(int i=0; igetPriority(); auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] ); for( int t=0; tserver_info.count( teams[t][0] ) ) { - auto& info = self->server_info[teams[t][0]]; + if( self->server_info.count( teams[t].servers[0] ) ) { + auto& info = self->server_info[teams[t].servers[0]]; bool found = false; for( int i = 0; i < info->teams.size(); i++ ) { - if( info->teams[i]->serverIDs == teams[t] ) { + if( info->teams[i]->serverIDs == teams[t].servers ) { maxPriority = std::max( maxPriority, info->teams[i]->getPriority() ); found = true; break; @@ -1826,9 +1874,10 @@ ACTOR Future dataDistributionTeamCollection( Optional>> otherTrackedDCs, PromiseStream< std::pair> > serverChanges, Future readyToStart, - Reference> zeroHealthyTeams ) + Reference> zeroHealthyTeams, + bool primary) { - state DDTeamCollection self( cx, masterId, lock, output, shardsAffectedByTeamFailure, configuration, includedDCs, otherTrackedDCs, serverChanges, readyToStart, zeroHealthyTeams ); + state DDTeamCollection self( cx, masterId, lock, output, shardsAffectedByTeamFailure, configuration, includedDCs, otherTrackedDCs, serverChanges, readyToStart, zeroHealthyTeams, primary ); state Future loggingTrigger = Void(); state PromiseStream serverRemoved; state Future interfaceChanges; @@ -2113,9 +2162,9 @@ ACTOR Future dataDistribution( TraceEvent("DDInitTakingMoveKeysLock", mi.id()); state MoveKeysLock lock = wait( takeMoveKeysLock( cx, mi.id() ) ); TraceEvent("DDInitTookMoveKeysLock", mi.id()); - state Reference initData = wait( getInitialDataDistribution(cx, mi.id(), lock) ); + state Reference initData = wait( getInitialDataDistribution(cx, mi.id(), lock, configuration.remoteTLogReplicationFactor > 0 ? remoteDcIds : std::vector>() ) ); if(initData->shards.size() > 1) { - TraceEvent("DDInitGotInitialDD", mi.id()).detail("b", printable(initData->shards.end()[-2].begin)).detail("e", printable(initData->shards.end()[-2].end)).detail("src", describe(initData->shards.end()[-2].value.first)).detail("dest", describe(initData->shards.end()[-2].value.second)).trackLatest("InitialDD"); + TraceEvent("DDInitGotInitialDD", mi.id()).detail("b", printable(initData->shards.end()[-2].key)).detail("e", printable(initData->shards.end()[-1].key)).detail("src", describe(initData->shards.end()[-2].primarySrc)).detail("dest", describe(initData->shards.end()[-2].primaryDest)).trackLatest("InitialDD"); } else { TraceEvent("DDInitGotInitialDD", mi.id()).detail("b","").detail("e", "").detail("src", "[no items]").detail("dest", "[no items]").trackLatest("InitialDD"); } @@ -2168,13 +2217,29 @@ ACTOR Future dataDistribution( Reference shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure ); + for(int s=0; sshards.size() - 1; s++) { + KeyRangeRef keys = KeyRangeRef(initData->shards[s].key, initData->shards[s+1].key); + shardsAffectedByTeamFailure->defineShard(keys); + std::vector teams; + teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[s].primarySrc, true)); + if(configuration.remoteTLogReplicationFactor > 0) { + teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[s].remoteSrc, false)); + } + shardsAffectedByTeamFailure->moveShard(keys, teams); + if(initData->shards[s].hasDest) { + // This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in + // DataDistributionQueue to track it, but it's easier to just (with low priority) schedule it for movement. + output.send( RelocateShard( keys, PRIORITY_RECOVER_MOVE ) ); + } + } + actors.push_back( pollMoveKeysLock(cx, lock) ); actors.push_back( popOldTags( cx, logSystem, recoveryCommitVersion) ); - actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, shardsAffectedByTeamFailure, output, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) ); + actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) ); actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, getShardMetrics, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, configuration.durableStorageQuorum, lastLimited ), "DDQueue", mi.id(), &normalDDQueueErrors() ) ); - actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, configuration.remoteTLogReplicationFactor > 0 ? remoteDcIds : std::vector>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0] ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) ); + actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, configuration.remoteTLogReplicationFactor > 0 ? remoteDcIds : std::vector>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) ); if (configuration.remoteTLogReplicationFactor > 0) { - actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[1] ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) ); + actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[1], false ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) ); } Void _ = wait( waitForAll( actors ) ); @@ -2215,7 +2280,8 @@ DDTeamCollection* testTeamCollection(int teamSize, IRepPolicyRef policy, int pro {}, PromiseStream>>(), Future(Void()), - Reference>( new AsyncVar(true) ) + Reference>( new AsyncVar(true) ), + true ); for(int id = 1; id <= processCount; id++) { diff --git a/fdbserver/DataDistribution.h b/fdbserver/DataDistribution.h index ed2a35e7c1..93de1b9a3d 100644 --- a/fdbserver/DataDistribution.h +++ b/fdbserver/DataDistribution.h @@ -120,7 +120,23 @@ struct TeamCollectionInterface { class ShardsAffectedByTeamFailure : public ReferenceCounted { public: ShardsAffectedByTeamFailure() {} - typedef vector Team; // sorted + + struct Team { + vector servers; // sorted + bool primary; + + Team() : primary(true) {} + Team(vector const& servers, bool primary) : servers(servers), primary(primary) {} + + bool operator < ( const Team& r ) const { + if( servers == r.servers ) return primary < r.primary; + return servers < r.servers; + } + bool operator == ( const Team& r ) const { + return servers == r.servers && primary == r.primary; + } + }; + // This tracks the data distribution on the data distribution server so that teamTrackers can // relocate the right shards when a team is degraded. @@ -138,9 +154,9 @@ public: int getNumberOfShards( UID ssID ); vector getShardsFor( Team team ); - vector> getTeamsFor( KeyRangeRef keys ); + vector getTeamsFor( KeyRangeRef keys ); void defineShard( KeyRangeRef keys ); - void moveShard( KeyRangeRef keys, Team destinationTeam ); + void moveShard( KeyRangeRef keys, std::vector destinationTeam ); void check(); private: struct OrderByTeamKey { @@ -159,12 +175,23 @@ private: void insert(Team team, KeyRange const& range); }; +struct ShardInfo { + Key key; + vector primarySrc; + vector remoteSrc; + vector primaryDest; + vector remoteDest; + bool hasDest; + + explicit ShardInfo(Key key) : key(key), hasDest(false) {} +}; + struct InitialDataDistribution : ReferenceCounted { - typedef vector Team; // sorted int mode; vector> allServers; - std::set< Team > teams; - vector>> shards; + std::set> primaryTeams; + std::set> remoteTeams; + vector shards; }; Future dataDistribution( @@ -180,7 +207,6 @@ Future dataDistribution( Future dataDistributionTracker( Reference const& initData, Database const& cx, - Reference const& shardsAffectedByTeamFailure, PromiseStream const& output, PromiseStream const& getShardMetrics, FutureStream> const& getAverageShardBytes, diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index ee1c4960be..8bac1b521c 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -837,6 +837,7 @@ ACTOR Future dataDistributionRelocator( DDQueueData *self, RelocateData rd state bool signalledTransferComplete = false; state UID masterId = self->mi.id(); state ParallelTCInfo destination; + state std::vector destinationTeams; state ParallelTCInfo healthyDestinations; state bool anyHealthy = false; state int durableStorageQuorum = 0; @@ -865,6 +866,7 @@ ACTOR Future dataDistributionRelocator( DDQueueData *self, RelocateData rd state int tciIndex = 0; state bool foundTeams = true; destination.clear(); + destinationTeams.clear(); healthyDestinations.clear(); anyHealthy = false; durableStorageQuorum = 0; @@ -883,6 +885,7 @@ ACTOR Future dataDistributionRelocator( DDQueueData *self, RelocateData rd Optional> bestTeam = wait(brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req))); if (bestTeam.present()) { destination.addTeam(bestTeam.get()); + destinationTeams.push_back(ShardsAffectedByTeamFailure::Team(bestTeam.get()->getServerIDs(), tciIndex == 0)); if(bestTeam.get()->isHealthy()) { healthyDestinations.addTeam(bestTeam.get()); anyHealthy = true; @@ -912,7 +915,7 @@ ACTOR Future dataDistributionRelocator( DDQueueData *self, RelocateData rd Void _ = wait( delay( SERVER_KNOBS->BEST_TEAM_STUCK_DELAY, TaskDataDistributionLaunch ) ); } - self->shardsAffectedByTeamFailure->moveShard(rd.keys, destination.getServerIDs()); + self->shardsAffectedByTeamFailure->moveShard(rd.keys, destinationTeams); //FIXME: do not add data in flight to servers that were already in the src. destination.addDataInFlightToTeam(+metrics.bytes); @@ -1009,12 +1012,12 @@ ACTOR Future dataDistributionRelocator( DDQueueData *self, RelocateData rd } } -ACTOR Future rebalanceTeams( DDQueueData* self, int priority, Reference sourceTeam, Reference destTeam ) { +ACTOR Future rebalanceTeams( DDQueueData* self, int priority, Reference sourceTeam, Reference destTeam, bool primary ) { if(g_network->isSimulated() && g_simulator.speedUpSimulation) { return false; } - std::vector shards = self->shardsAffectedByTeamFailure->getShardsFor( sourceTeam->getServerIDs() ); + std::vector shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) ); if( !shards.size() ) return false; @@ -1028,7 +1031,7 @@ ACTOR Future rebalanceTeams( DDQueueData* self, int priority, Reference shards = self->shardsAffectedByTeamFailure->getShardsFor( sourceTeam->getServerIDs() ); + std::vector shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) ); for( int i = 0; i < shards.size(); i++ ) { if( moveShard == shards[i] ) { TraceEvent(priority == PRIORITY_REBALANCE_OVERUTILIZED_TEAM ? "BgDDMountainChopper" : "BgDDValleyFiller", self->mi.id()) @@ -1057,7 +1060,7 @@ ACTOR Future BgDDMountainChopper( DDQueueData* self, int teamCollectionInd if( randomTeam.get()->getMinFreeSpaceRatio() > SERVER_KNOBS->FREE_SPACE_RATIO_DD_CUTOFF ) { state Optional> loadedTeam = wait( brokenPromiseToNever( self->teamCollections[teamCollectionIndex].getTeam.getReply( GetTeamRequest( true, true, false ) ) ) ); if( loadedTeam.present() ) { - bool moved = wait( rebalanceTeams( self, PRIORITY_REBALANCE_OVERUTILIZED_TEAM, loadedTeam.get(), randomTeam.get() ) ); + bool moved = wait( rebalanceTeams( self, PRIORITY_REBALANCE_OVERUTILIZED_TEAM, loadedTeam.get(), randomTeam.get(), teamCollectionIndex == 0 ) ); if(moved) { resetCount = 0; } else { @@ -1092,7 +1095,7 @@ ACTOR Future BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex) state Optional> unloadedTeam = wait( brokenPromiseToNever( self->teamCollections[teamCollectionIndex].getTeam.getReply( GetTeamRequest( true, true, true ) ) ) ); if( unloadedTeam.present() ) { if( unloadedTeam.get()->getMinFreeSpaceRatio() > SERVER_KNOBS->FREE_SPACE_RATIO_DD_CUTOFF ) { - bool moved = wait( rebalanceTeams( self, PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, randomTeam.get(), unloadedTeam.get() ) ); + bool moved = wait( rebalanceTeams( self, PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, randomTeam.get(), unloadedTeam.get(), teamCollectionIndex == 0 ) ); if(moved) { resetCount = 0; } else { diff --git a/fdbserver/DataDistributionTracker.actor.cpp b/fdbserver/DataDistributionTracker.actor.cpp index 61db4b6945..3cc754daa2 100644 --- a/fdbserver/DataDistributionTracker.actor.cpp +++ b/fdbserver/DataDistributionTracker.actor.cpp @@ -598,9 +598,7 @@ void restartShardTrackers( DataDistributionTracker* self, KeyRangeRef keys, Opti } } -ACTOR Future trackInitialShards(DataDistributionTracker *self, - Reference initData, - Reference shardsAffectedByTeamFailure) +ACTOR Future trackInitialShards(DataDistributionTracker *self, Reference initData) { TraceEvent("TrackInitialShards", self->masterId).detail("InitialShardCount", initData->shards.size()); @@ -608,35 +606,9 @@ ACTOR Future trackInitialShards(DataDistributionTracker *self, //SOMEDAY: Figure out what this priority should actually be Void _ = wait( delay( 0.0, TaskDataDistribution ) ); - state int lastBegin = -1; - state vector last; - state int s; - for(s=0; sshards.size(); s++) { - state InitialDataDistribution::Team src = initData->shards[s].value.first; - auto& dest = initData->shards[s].value.second; - if (dest.size()) { - // This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in - // DataDistributionQueue to track it, but it's easier to just (with low priority) schedule it for movement. - self->output.send( RelocateShard( initData->shards[s], PRIORITY_RECOVER_MOVE ) ); - } - - // The following clause was here for no remembered reason. It was removed, however, because on resumption of stopped - // clusters (of size 3) it was grouping all the the shards in the system into one, and then splitting them all back out, - // causing unecessary data distribution. - //if (s==0 || s+1==initData.shards.size() || lastBegin<0 || src != last || initData.shards[s].begin == keyServersPrefix) { - // end current run, start a new shardTracker - // relies on the dummy shard at allkeysend - - if (lastBegin >= 0) { - state KeyRangeRef keys( initData->shards[lastBegin].begin, initData->shards[s].begin ); - restartShardTrackers( self, keys ); - shardsAffectedByTeamFailure->defineShard( keys ); - shardsAffectedByTeamFailure->moveShard( keys, last ); - } - lastBegin = s; - last = src; - //} + for(s=0; sshards.size()-1; s++) { + restartShardTrackers( self, KeyRangeRef( initData->shards[s].key, initData->shards[s+1].key ) ); Void _ = wait( yield( TaskDataDistribution ) ); } @@ -692,7 +664,6 @@ ACTOR Future fetchShardMetrics( DataDistributionTracker* self, GetMetricsR ACTOR Future dataDistributionTracker( Reference initData, Database cx, - Reference shardsAffectedByTeamFailure, PromiseStream output, PromiseStream getShardMetrics, FutureStream> getAverageShardBytes, @@ -703,7 +674,7 @@ ACTOR Future dataDistributionTracker( state DataDistributionTracker self(cx, masterId, readyToStart, output, anyZeroHealthyTeams); state Future loggingTrigger = Void(); try { - Void _ = wait( trackInitialShards( &self, initData, shardsAffectedByTeamFailure ) ); + Void _ = wait( trackInitialShards( &self, initData ) ); initData = Reference(); loop choose { @@ -731,9 +702,7 @@ ACTOR Future dataDistributionTracker( vector ShardsAffectedByTeamFailure::getShardsFor( Team team ) { vector r; - for(auto it = team_shards.lower_bound( std::pair( team, KeyRangeRef() ) ); - it != team_shards.end() && it->first == team; - ++it) + for(auto it = team_shards.lower_bound( std::pair( team, KeyRangeRef() ) ); it != team_shards.end() && it->first == team; ++it) r.push_back( it->second ); return r; } @@ -742,20 +711,20 @@ int ShardsAffectedByTeamFailure::getNumberOfShards( UID ssID ) { return storageServerShards[ssID]; } -vector> ShardsAffectedByTeamFailure::getTeamsFor( KeyRangeRef keys ) { +vector ShardsAffectedByTeamFailure::getTeamsFor( KeyRangeRef keys ) { return shard_teams[keys.begin]; } void ShardsAffectedByTeamFailure::erase(Team team, KeyRange const& range) { if(team_shards.erase( std::pair(team, range) ) > 0) { - for(auto uid = team.begin(); uid != team.end(); ++uid) + for(auto uid = team.servers.begin(); uid != team.servers.end(); ++uid) storageServerShards[*uid]--; } } void ShardsAffectedByTeamFailure::insert(Team team, KeyRange const& range) { if(team_shards.insert( std::pair( team, range ) ).second) { - for(auto uid = team.begin(); uid != team.end(); ++uid) + for(auto uid = team.servers.begin(); uid != team.servers.end(); ++uid) storageServerShards[*uid]++; } } @@ -787,7 +756,7 @@ void ShardsAffectedByTeamFailure::defineShard( KeyRangeRef keys ) { check(); } -void ShardsAffectedByTeamFailure::moveShard( KeyRangeRef keys, Team destinationTeam ) { +void ShardsAffectedByTeamFailure::moveShard( KeyRangeRef keys, std::vector destinationTeams ) { /*TraceEvent("ShardsAffectedByTeamFailureMove") .detail("KeyBegin", printable(keys.begin)) .detail("KeyEnd", printable(keys.end)) @@ -795,31 +764,36 @@ void ShardsAffectedByTeamFailure::moveShard( KeyRangeRef keys, Team destinationT .detail("NewTeam", describe(destinationTeam));*/ auto ranges = shard_teams.intersectingRanges( keys ); - std::vector< std::pair > modifiedShards; + std::vector< std::pair,KeyRange> > modifiedShards; for(auto it = ranges.begin(); it != ranges.end(); ++it) { if( keys.contains( it->range() ) ) { - // erase the many teams that were assiciated with this one shard + // erase the many teams that were associated with this one shard for(auto t = it->value().begin(); t != it->value().end(); ++t) { erase(*t, it->range()); } // save this modification for later insertion - modifiedShards.push_back( std::pair( destinationTeam, it->range() ) ); + modifiedShards.push_back( std::pair,KeyRange>( destinationTeams, it->range() ) ); } else { // for each range that touches this move, add our team as affecting this range - insert(destinationTeam, it->range()); + for(auto& team : destinationTeams) { + insert(team, it->range()); - // if we are not in the list of teams associated with this shard, add us in - auto& teams = it->value(); - if( std::find( teams.begin(), teams.end(), destinationTeam ) == teams.end() ) - teams.push_back( destinationTeam ); + // if we are not in the list of teams associated with this shard, add us in + auto& teams = it->value(); + if( std::find( teams.begin(), teams.end(), team ) == teams.end() ) { + teams.push_back( team ); + } + } } } // we cannot modify the KeyRangeMap while iterating through it, so add saved modifications now for( int i = 0; i < modifiedShards.size(); i++ ) { - insert(modifiedShards[i].first, modifiedShards[i].second); - shard_teams.insert( modifiedShards[i].second, vector( 1, modifiedShards[i].first ) ); + for( auto& t : modifiedShards[i].first) { + insert(t, modifiedShards[i].second); + } + shard_teams.insert( modifiedShards[i].second, modifiedShards[i].first ); } check(); @@ -838,11 +812,11 @@ void ShardsAffectedByTeamFailure::check() { auto rs = shard_teams.ranges(); for(auto i = rs.begin(); i != rs.end(); ++i) for(vector::iterator t = i->value().begin(); t != i->value().end(); ++t) - if (!team_shards.count( make_pair( *t, i->range() ) )) { + if (!team_shards.count( std::make_pair( *t, i->range() ) )) { std::string teamDesc, shards; - for(int k=0; ksize(); k++) - teamDesc += format("%llx ", (*t)[k].first()); - for(auto x = team_shards.lower_bound( make_pair( *t, KeyRangeRef() ) ); x != team_shards.end() && x->first == *t; ++x) + for(int k=0; kservers.size(); k++) + teamDesc += format("%llx ", t->servers[k].first()); + for(auto x = team_shards.lower_bound( std::make_pair( *t, KeyRangeRef() ) ); x != team_shards.end() && x->first == *t; ++x) shards += printable(x->second.begin) + "-" + printable(x->second.end) + ","; TraceEvent(SevError,"SATFInvariantError2") .detail("KB", printable(i->begin()))