diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 5ab886b015..10980963e1 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -812,12 +812,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServers) { - Version prev = outstandingPops[std::make_pair(log->get().id(),tag)].first; - if (prev < upTo) - outstandingPops[std::make_pair(log->get().id(),tag)] = std::make_pair(upTo, knownCommittedVersion); - if (prev == 0) - actors.add( popFromLog( this, log, tag, 1.0 ) ); //< FIXME: knob + if(t->locality == tagLocalitySpecial || t->locality == tag.locality || tag.locality < 0) { + for(auto& log : t->logServers) { + Version prev = outstandingPops[std::make_pair(log->get().id(),tag)].first; + if (prev < upTo) + outstandingPops[std::make_pair(log->get().id(),tag)] = std::make_pair(upTo, knownCommittedVersion); + if (prev == 0) + actors.add( popFromLog( this, log, tag, 1.0 ) ); //< FIXME: knob + } } } } @@ -1548,6 +1550,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedTLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); } + std::vector localTags; + for(auto& tag : allTags) { + if(remoteLocality == tagLocalitySpecial || remoteLocality == tag.locality || tag.locality < 0) { + localTags.push_back(tag); + } + } + state vector> remoteTLogInitializationReplies; vector< InitializeTLogRequest > remoteTLogReqs( remoteWorkers.remoteTLogs.size() ); for( int i = 0; i < remoteWorkers.remoteTLogs.size(); i++ ) { @@ -1561,7 +1570,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedstartVersion; req.logRouterTags = 0; } @@ -1691,9 +1700,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogSystemConfigChanged.trigger(); } + std::vector localTags; + for(auto& tag : allTags) { + if(primaryLocality == tagLocalitySpecial || primaryLocality == tag.locality || tag.locality < 0) { + localTags.push_back(tag); + } + } + state vector> initializationReplies; vector< InitializeTLogRequest > reqs( recr.tLogs.size() ); - for( int i = 0; i < recr.tLogs.size(); i++ ) { InitializeTLogRequest &req = reqs[i]; req.recruitmentID = logSystem->recruitmentID; @@ -1705,7 +1720,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs[0]->startVersion; req.logRouterTags = logSystem->logRouterTags; } @@ -1719,7 +1734,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs[0]->bestLocationFor( tag ) ].recoverTags.push_back( tag ); } std::vector locations; - for( Tag tag : allTags ) { + for( Tag tag : localTags ) { locations.clear(); logSystem->tLogs[0]->getPushLocations( vector(1, tag), locations, 0 ); for(int loc : locations) @@ -1732,9 +1747,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> recoveryComplete; if(region.satelliteTLogReplicationFactor > 0) { + std::vector satelliteTags; + satelliteTags.push_back(txsTag); + state vector> satelliteInitializationReplies; vector< InitializeTLogRequest > sreqs( recr.satelliteTLogs.size() ); - for( int i = 0; i < recr.satelliteTLogs.size(); i++ ) { InitializeTLogRequest &req = sreqs[i]; req.recruitmentID = logSystem->recruitmentID; @@ -1746,7 +1763,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedknownCommittedVersion + 1; req.logRouterTags = logSystem->logRouterTags; }