fix: a team tracker could downgrade the priority of a relocation issued by the team tracker for the other region

This commit is contained in:
Evan Tschannen 2018-11-09 10:07:55 -08:00
parent 6874e379fc
commit 3e2484baf7
2 changed files with 96 additions and 77 deletions

View File

@ -512,7 +512,7 @@ Future<Void> storageServerTracker(
Future<Void> teamTracker( struct DDTeamCollection* const& self, Reference<TCTeamInfo> const& team, bool const& badTeam );
struct DDTeamCollection {
struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
enum { REQUESTING_WORKER = 0, GETTING_WORKER = 1, GETTING_STORAGE = 2 };
PromiseStream<Future<Void>> addActor;
@ -564,6 +564,8 @@ struct DDTeamCollection {
Reference<LocalitySet> storageServerSet;
std::vector<LocalityEntry> forcedEntries, resultEntries;
std::vector<DDTeamCollection*> teamCollections;
void resetLocalitySet() {
storageServerSet = Reference<LocalitySet>(new LocalityMap<UID>());
LocalityMap<UID>* storageServerMap = (LocalityMap<UID>*) storageServerSet.getPtr();
@ -631,7 +633,7 @@ struct DDTeamCollection {
teamBuilder.cancel();
}
ACTOR static Future<Void> logOnCompletion( Future<Void> signal, DDTeamCollection *self ) {
ACTOR static Future<Void> logOnCompletion( Future<Void> signal, DDTeamCollection* self ) {
Void _ = wait(signal);
Void _ = wait(delay(SERVER_KNOBS->LOG_ON_COMPLETION_DELAY, TaskDataDistribution));
@ -835,7 +837,7 @@ struct DDTeamCollection {
return total;
}
ACTOR static Future<Void> addSubsetOfEmergencyTeams( DDTeamCollection *self ) {
ACTOR static Future<Void> addSubsetOfEmergencyTeams( DDTeamCollection* self ) {
state int idx = 0;
state std::vector<Reference<TCServerInfo>> servers;
state std::vector<UID> serverIds;
@ -912,7 +914,7 @@ struct DDTeamCollection {
return Void();
}
ACTOR static Future<Void> init( DDTeamCollection *self, Reference<InitialDataDistribution> initTeams ) {
ACTOR static Future<Void> init( DDTeamCollection* self, Reference<InitialDataDistribution> initTeams ) {
// SOMEDAY: If some servers have teams and not others (or some servers have more data than others) and there is an address/locality collision, should
// we preferentially mark the least used server as undesirable?
for (auto i = initTeams->allServers.begin(); i != initTeams->allServers.end(); ++i) {
@ -1013,7 +1015,7 @@ struct DDTeamCollection {
addTeam(team.begin(), team.end());
}
ACTOR Future<Void> addAllTeams( DDTeamCollection *self, int location, vector<LocalityEntry>* history, Reference<LocalityMap<UID>> processes, vector<std::vector<UID>>* output, int teamLimit, int* addedTeams ) {
ACTOR Future<Void> addAllTeams( DDTeamCollection* self, int location, vector<LocalityEntry>* history, Reference<LocalityMap<UID>> processes, vector<std::vector<UID>>* output, int teamLimit, int* addedTeams ) {
Void _ = wait( yield( TaskDataDistributionLaunch ) );
// Add team, if valid
@ -1048,7 +1050,7 @@ struct DDTeamCollection {
return Void();
}
ACTOR Future<int> addAllTeams( DDTeamCollection *self, vector<UID> input, vector<std::vector<UID>>* output, int teamLimit ) {
ACTOR Future<int> addAllTeams( DDTeamCollection* self, vector<UID> input, vector<std::vector<UID>>* output, int teamLimit ) {
state int addedTeams = 0;
state vector<LocalityEntry> history;
state Reference<LocalityMap<UID>> processes(new LocalityMap<UID>());
@ -1373,7 +1375,7 @@ struct DDTeamCollection {
};
// Track a team and issue RelocateShards when the level of degradation changes
ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<TCTeamInfo> team, bool badTeam ) {
ACTOR Future<Void> teamTracker( DDTeamCollection* self, Reference<TCTeamInfo> team, bool badTeam ) {
state int lastServersLeft = team->getServerIDs().size();
state bool lastAnyUndesired = false;
state bool logTeamEvents = g_network->isSimulated() || !badTeam;
@ -1512,24 +1514,35 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<TCTeamInfo> te
for(int i=0; i<shards.size(); i++) {
int maxPriority = team->getPriority();
int maxOtherPriority = maxPriority;
if(maxPriority < PRIORITY_TEAM_0_LEFT) {
auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] );
for( int t=0; t<teams.size(); t++) {
if( teams[t].servers.size() && self->server_info.count( teams[t].servers[0] ) ) {
auto& info = self->server_info[teams[t].servers[0]];
if( teams[t].servers.size() ) {
bool foundServer = false;
for(auto tc : self->teamCollections) {
if( tc->server_info.count( teams[t].servers[0] ) ) {
ASSERT(!foundServer);
foundServer = true;
auto& info = tc->server_info[teams[t].servers[0]];
bool found = false;
for( int i = 0; i < info->teams.size(); i++ ) {
if( info->teams[i]->serverIDs == teams[t].servers ) {
maxPriority = std::max( maxPriority, info->teams[i]->getPriority() );
found = true;
break;
bool found = false;
for( int i = 0; i < info->teams.size(); i++ ) {
if( info->teams[i]->serverIDs == teams[t].servers ) {
if(teams[t].primary == self->primary) {
maxPriority = std::max( maxPriority, info->teams[i]->getPriority() );
} else {
maxOtherPriority = std::max( maxOtherPriority, info->teams[i]->getPriority() );
}
found = true;
break;
}
}
TEST(!found); // A removed team is still associated with a shard in SABTF
}
}
TEST(!found); // A removed team is still associated with a shard in SABTF
} else {
TEST(teams[t].servers.size()); // A removed server is still associated with a team in SABTF
TEST(!foundServer); // A removed server is still associated with a team in SABTF
}
}
}
@ -1537,7 +1550,7 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<TCTeamInfo> te
if( maxPriority == team->getPriority() || lastPriority > maxPriority ) {
RelocateShard rs;
rs.keys = shards[i];
rs.priority = maxPriority;
rs.priority = std::max(maxPriority,maxOtherPriority);
self->output.send(rs);
if(g_random->random01() < 0.01) {
@ -1580,10 +1593,10 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<TCTeamInfo> te
}
}
ACTOR Future<Void> trackExcludedServers( DDTeamCollection *self, Database cx ) {
ACTOR Future<Void> trackExcludedServers( DDTeamCollection* self ) {
loop {
// Fetch the list of excluded servers
state Transaction tr(cx);
state Transaction tr(self->cx);
state Optional<Value> lastChangeID;
loop {
try {
@ -1627,7 +1640,7 @@ ACTOR Future<Void> trackExcludedServers( DDTeamCollection *self, Database cx ) {
break;
Void _ = wait( delay( SERVER_KNOBS->SERVER_LIST_DELAY, TaskDataDistribution ) ); // FIXME: make this tr.watch( excludedServersVersionKey ) instead
tr = Transaction(cx);
tr = Transaction(self->cx);
} catch (Error& e) {
Void _ = wait( tr.onError(e) );
}
@ -1654,11 +1667,11 @@ ACTOR Future<vector<std::pair<StorageServerInterface, ProcessClass>>> getServerL
return results;
}
ACTOR Future<Void> waitServerListChange( DDTeamCollection *self, Database cx, FutureStream<Void> serverRemoved ) {
ACTOR Future<Void> waitServerListChange( DDTeamCollection* self, FutureStream<Void> serverRemoved ) {
state Future<Void> checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY);
state Future<vector<std::pair<StorageServerInterface, ProcessClass>>> serverListAndProcessClasses = Never();
state bool isFetchingResults = false;
state Transaction tr(cx);
state Transaction tr(self->cx);
loop {
try {
choose {
@ -1692,12 +1705,12 @@ ACTOR Future<Void> waitServerListChange( DDTeamCollection *self, Database cx, Fu
}
}
tr = Transaction(cx);
tr = Transaction(self->cx);
checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY);
}
when( Void _ = waitNext( serverRemoved ) ) {
if( isFetchingResults ) {
tr = Transaction(cx);
tr = Transaction(self->cx);
serverListAndProcessClasses = getServerListAndProcessClasses(&tr);
}
}
@ -1721,7 +1734,7 @@ ACTOR Future<Void> serverMetricsPolling( TCServerInfo *server) {
}
//Returns the KeyValueStoreType of server if it is different from self->storeType
ACTOR Future<KeyValueStoreType> keyValueStoreTypeTracker(DDTeamCollection *self, TCServerInfo *server) {
ACTOR Future<KeyValueStoreType> keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo *server) {
state KeyValueStoreType type = wait(brokenPromiseToNever(server->lastKnownInterface.getKeyValueStoreType.getReplyWithTaskID<KeyValueStoreType>(TaskDataDistribution)));
if(type == self->configuration.storageServerStoreType && (self->includedDCs.empty() || std::find(self->includedDCs.begin(), self->includedDCs.end(), server->lastKnownInterface.locality.dcId()) != self->includedDCs.end()) )
Void _ = wait(Future<Void>(Never()));
@ -1750,7 +1763,7 @@ ACTOR Future<Void> removeBadTeams(DDTeamCollection* self) {
}
ACTOR Future<Void> storageServerTracker(
DDTeamCollection *self,
DDTeamCollection* self,
Database cx,
TCServerInfo *server, //This actor is owned by this TCServerInfo
ServerStatusMap *statusMap,
@ -1944,7 +1957,7 @@ ACTOR Future<Void> storageServerTracker(
}
//Monitor whether or not storage servers are being recruited. If so, then a database cannot be considered quiet
ACTOR Future<Void> monitorStorageServerRecruitment(DDTeamCollection *self) {
ACTOR Future<Void> monitorStorageServerRecruitment(DDTeamCollection* self) {
state bool recruiting = false;
TraceEvent("StorageServerRecruitment", self->masterId)
.detail("State", "Idle")
@ -1973,7 +1986,7 @@ ACTOR Future<Void> monitorStorageServerRecruitment(DDTeamCollection *self) {
}
}
ACTOR Future<Void> initializeStorage( DDTeamCollection *self, RecruitStorageReply candidateWorker ) {
ACTOR Future<Void> initializeStorage( DDTeamCollection* self, RecruitStorageReply candidateWorker ) {
// SOMEDAY: Cluster controller waits for availability, retry quickly if a server's Locality changes
self->recruitingStream.set(self->recruitingStream.get()+1);
@ -2021,7 +2034,7 @@ ACTOR Future<Void> initializeStorage( DDTeamCollection *self, RecruitStorageRepl
return Void();
}
ACTOR Future<Void> storageRecruiter( DDTeamCollection *self, Reference<AsyncVar<struct ServerDBInfo>> db ) {
ACTOR Future<Void> storageRecruiter( DDTeamCollection* self, Reference<AsyncVar<struct ServerDBInfo>> db ) {
state Future<RecruitStorageReply> fCandidateWorker;
state RecruitStorageRequest lastRequest;
loop {
@ -2128,83 +2141,74 @@ ACTOR Future<Void> serverGetTeamRequests(TeamCollectionInterface tci, DDTeamColl
// Keep track of servers and teams -- serves requests for getRandomTeam
ACTOR Future<Void> dataDistributionTeamCollection(
Reference<DDTeamCollection> teamCollection,
std::vector<DDTeamCollection*> teamCollections,
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Database cx,
Reference<AsyncVar<struct ServerDBInfo>> db,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
MoveKeysLock lock,
PromiseStream<RelocateShard> output,
UID masterId,
DatabaseConfiguration configuration,
std::vector<Optional<Key>> includedDCs,
Optional<std::vector<Optional<Key>>> otherTrackedDCs,
Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >> serverChanges,
Future<Void> readyToStart,
Reference<AsyncVar<bool>> zeroHealthyTeams,
bool primary,
Reference<AsyncVar<bool>> processingUnhealthy)
Reference<AsyncVar<struct ServerDBInfo>> db)
{
state DDTeamCollection self( cx, masterId, lock, output, shardsAffectedByTeamFailure, configuration, includedDCs, otherTrackedDCs, serverChanges, readyToStart, zeroHealthyTeams, primary, processingUnhealthy );
state DDTeamCollection* self = teamCollection.getPtr();
state Future<Void> loggingTrigger = Void();
state PromiseStream<Void> serverRemoved;
state Future<Void> error = actorCollection( self.addActor.getFuture() );
state Future<Void> error = actorCollection( self->addActor.getFuture() );
self->teamCollections = teamCollections;
try {
Void _ = wait( self.init( &self, initData ) );
Void _ = wait( DDTeamCollection::init( self, initData ) );
initData = Reference<InitialDataDistribution>();
self.addActor.send(serverGetTeamRequests(tci, &self));
self->addActor.send(serverGetTeamRequests(tci, self));
TraceEvent("DDTeamCollectionBegin", masterId).detail("Primary", primary);
Void _ = wait( readyToStart || error );
TraceEvent("DDTeamCollectionReadyToStart", masterId).detail("Primary", primary);
TraceEvent("DDTeamCollectionBegin", self->masterId).detail("Primary", self->primary);
Void _ = wait( self->readyToStart || error );
TraceEvent("DDTeamCollectionReadyToStart", self->masterId).detail("Primary", self->primary);
self.addActor.send(storageRecruiter( &self, db ));
self.addActor.send(monitorStorageServerRecruitment( &self ));
self.addActor.send(waitServerListChange( &self, cx, serverRemoved.getFuture() ));
self.addActor.send(trackExcludedServers( &self, cx ));
self->addActor.send(storageRecruiter( self, db ));
self->addActor.send(monitorStorageServerRecruitment( self ));
self->addActor.send(waitServerListChange( self, serverRemoved.getFuture() ));
self->addActor.send(trackExcludedServers( self ));
if(self.badTeamRemover.isReady()) {
self.badTeamRemover = removeBadTeams(&self);
self.addActor.send(self.badTeamRemover);
if(self->badTeamRemover.isReady()) {
self->badTeamRemover = removeBadTeams(self);
self->addActor.send(self->badTeamRemover);
}
if(includedDCs.size()) {
self.addActor.send(updateReplicasKey(&self, includedDCs[0]));
if(self->includedDCs.size()) {
self->addActor.send(updateReplicasKey(self, self->includedDCs[0]));
}
// SOMEDAY: Monitor FF/serverList for (new) servers that aren't in allServers and add or remove them
loop choose {
when( UID removedServer = waitNext( self.removedServers.getFuture() ) ) {
when( UID removedServer = waitNext( self->removedServers.getFuture() ) ) {
TEST(true); // Storage server removed from database
self.removeServer( removedServer );
self->removeServer( removedServer );
serverRemoved.send( Void() );
self.restartRecruiting.trigger();
self->restartRecruiting.trigger();
}
when( Void _ = wait( self.zeroHealthyTeams->onChange() ) ) {
if(self.zeroHealthyTeams->get()) {
self.restartRecruiting.trigger();
self.noHealthyTeams();
when( Void _ = wait( self->zeroHealthyTeams->onChange() ) ) {
if(self->zeroHealthyTeams->get()) {
self->restartRecruiting.trigger();
self->noHealthyTeams();
}
}
when( Void _ = wait( loggingTrigger ) ) {
int highestPriority = 0;
for(auto it : self.priority_teams) {
for(auto it : self->priority_teams) {
if(it.second > 0) {
highestPriority = std::max(highestPriority, it.first);
}
}
TraceEvent("TotalDataInFlight", masterId).detail("Primary", self.primary).detail("TotalBytes", self.getDebugTotalDataInFlight()).detail("UnhealthyServers", self.unhealthyServers)
.detail("HighestPriority", highestPriority).trackLatest( self.primary ? "TotalDataInFlight" : "TotalDataInFlightRemote" );
TraceEvent("TotalDataInFlight", self->masterId).detail("Primary", self->primary).detail("TotalBytes", self->getDebugTotalDataInFlight()).detail("UnhealthyServers", self->unhealthyServers)
.detail("HighestPriority", highestPriority).trackLatest( self->primary ? "TotalDataInFlight" : "TotalDataInFlightRemote" );
loggingTrigger = delay( SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL );
}
when( Void _ = wait( self.serverTrackerErrorOut.getFuture() ) ) {} // Propagate errors from storageServerTracker
when( Void _ = wait( self->serverTrackerErrorOut.getFuture() ) ) {} // Propagate errors from storageServerTracker
when( Void _ = wait( error ) ) {}
}
} catch (Error& e) {
if (e.code() != error_code_movekeys_conflict)
TraceEvent(SevError, "DataDistributionTeamCollectionError", masterId).error(e);
TraceEvent(SevError, "DataDistributionTeamCollectionError", self->masterId).error(e);
throw e;
}
}
@ -2408,6 +2412,8 @@ ACTOR Future<Void> dataDistribution(
if(configuration.usableRegions > 1) {
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].remoteSrc, false));
}
TraceEvent("DDInitShard").detail("Keys", printable(keys)).detail("PrimarySrc", describe(initData->shards[shard].primarySrc)).detail("RemoteSrc", describe(initData->shards[shard].remoteSrc))
.detail("PrimaryDest", describe(initData->shards[shard].primaryDest)).detail("RemoteDest", describe(initData->shards[shard].remoteDest));
shardsAffectedByTeamFailure->moveShard(keys, teams);
if(initData->shards[shard].hasDest) {
// This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in
@ -2422,6 +2428,7 @@ ACTOR Future<Void> dataDistribution(
}
vector<TeamCollectionInterface> tcis;
Reference<AsyncVar<bool>> anyZeroHealthyTeams;
vector<Reference<AsyncVar<bool>>> zeroHealthyTeams;
tcis.push_back(TeamCollectionInterface());
@ -2443,9 +2450,21 @@ ACTOR Future<Void> dataDistribution(
actors.push_back( pollMoveKeysLock(cx, lock) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
vector<Reference<DDTeamCollection>> teamCollections;
teamCollections.push_back( Reference<DDTeamCollection>( new DDTeamCollection(cx, mi.id(), lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) ) );
if (configuration.usableRegions > 1) {
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >>(), readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
teamCollections.push_back( Reference<DDTeamCollection>( new DDTeamCollection(cx, mi.id(), lock, output, shardsAffectedByTeamFailure, configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), serverChanges, readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy) ) );
}
vector<DDTeamCollection*> teamCollectionsPtrs;
for(auto it : teamCollections) {
teamCollectionsPtrs.push_back(it.getPtr());
}
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( teamCollections[0], teamCollectionsPtrs, initData, tcis[0], db ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
if (configuration.usableRegions > 1) {
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( teamCollections[1], teamCollectionsPtrs, initData, tcis[1], db ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
}
actors.push_back(yieldPromiseStream(output.getFuture(), input));

View File

@ -290,7 +290,7 @@ ACTOR Future<Void> repairDeadDatacenter(Database cx, Reference<AsyncVar<ServerDB
ASSERT(!primaryDead || !remoteDead);
if(primaryDead || remoteDead) {
TraceEvent(SevWarnAlways, "DisablingFearlessConfiguration").detail("Location", context).detail("Stage", "Repopulate");
TraceEvent(SevWarnAlways, "DisablingFearlessConfiguration").detail("Location", context).detail("Stage", "Repopulate").detail("RemoteDead", remoteDead).detail("PrimaryDead", primaryDead);
g_simulator.usableRegions = 1;
ConfigurationResult::Type _ = wait( changeConfig( cx, (primaryDead ? g_simulator.disablePrimary : g_simulator.disableRemote) + " repopulate_anti_quorum=1", true ) );
while( dbInfo->get().recoveryState < RecoveryState::STORAGE_RECOVERED ) {