do not pop tags from logs that are not indexing that tag
This commit is contained in:
parent
fbb3f85c74
commit
1796e00149
|
@ -812,12 +812,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
ASSERT(popLocality == tagLocalityInvalid);
|
||||
for(auto& t : tLogs) {
|
||||
for(auto& log : t->logServers) {
|
||||
Version prev = outstandingPops[std::make_pair(log->get().id(),tag)].first;
|
||||
if (prev < upTo)
|
||||
outstandingPops[std::make_pair(log->get().id(),tag)] = std::make_pair(upTo, knownCommittedVersion);
|
||||
if (prev == 0)
|
||||
actors.add( popFromLog( this, log, tag, 1.0 ) ); //< FIXME: knob
|
||||
if(t->locality == tagLocalitySpecial || t->locality == tag.locality || tag.locality < 0) {
|
||||
for(auto& log : t->logServers) {
|
||||
Version prev = outstandingPops[std::make_pair(log->get().id(),tag)].first;
|
||||
if (prev < upTo)
|
||||
outstandingPops[std::make_pair(log->get().id(),tag)] = std::make_pair(upTo, knownCommittedVersion);
|
||||
if (prev == 0)
|
||||
actors.add( popFromLog( this, log, tag, 1.0 ) ); //< FIXME: knob
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1548,6 +1550,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logRouterInitializationReplies.push_back( transformErrors( throwErrorOr( remoteWorkers.logRouters[i%remoteWorkers.logRouters.size()].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
|
||||
}
|
||||
|
||||
std::vector<Tag> localTags;
|
||||
for(auto& tag : allTags) {
|
||||
if(remoteLocality == tagLocalitySpecial || remoteLocality == tag.locality || tag.locality < 0) {
|
||||
localTags.push_back(tag);
|
||||
}
|
||||
}
|
||||
|
||||
state vector<Future<TLogInterface>> remoteTLogInitializationReplies;
|
||||
vector< InitializeTLogRequest > remoteTLogReqs( remoteWorkers.remoteTLogs.size() );
|
||||
for( int i = 0; i < remoteWorkers.remoteTLogs.size(); i++ ) {
|
||||
|
@ -1561,7 +1570,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
req.remoteTag = Tag(tagLocalityRemoteLog, i);
|
||||
req.locality = remoteLocality;
|
||||
req.isPrimary = false;
|
||||
req.allTags = allTags;
|
||||
req.allTags = localTags;
|
||||
req.startVersion = logSet->startVersion;
|
||||
req.logRouterTags = 0;
|
||||
}
|
||||
|
@ -1691,9 +1700,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
oldLogSystem->logSystemConfigChanged.trigger();
|
||||
}
|
||||
|
||||
std::vector<Tag> localTags;
|
||||
for(auto& tag : allTags) {
|
||||
if(primaryLocality == tagLocalitySpecial || primaryLocality == tag.locality || tag.locality < 0) {
|
||||
localTags.push_back(tag);
|
||||
}
|
||||
}
|
||||
|
||||
state vector<Future<TLogInterface>> initializationReplies;
|
||||
vector< InitializeTLogRequest > reqs( recr.tLogs.size() );
|
||||
|
||||
for( int i = 0; i < recr.tLogs.size(); i++ ) {
|
||||
InitializeTLogRequest &req = reqs[i];
|
||||
req.recruitmentID = logSystem->recruitmentID;
|
||||
|
@ -1705,7 +1720,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
req.locality = primaryLocality;
|
||||
req.remoteTag = Tag(tagLocalityRemoteLog, i);
|
||||
req.isPrimary = true;
|
||||
req.allTags = allTags;
|
||||
req.allTags = localTags;
|
||||
req.startVersion = logSystem->tLogs[0]->startVersion;
|
||||
req.logRouterTags = logSystem->logRouterTags;
|
||||
}
|
||||
|
@ -1719,7 +1734,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
reqs[ logSystem->tLogs[0]->bestLocationFor( tag ) ].recoverTags.push_back( tag );
|
||||
}
|
||||
std::vector<int> locations;
|
||||
for( Tag tag : allTags ) {
|
||||
for( Tag tag : localTags ) {
|
||||
locations.clear();
|
||||
logSystem->tLogs[0]->getPushLocations( vector<Tag>(1, tag), locations, 0 );
|
||||
for(int loc : locations)
|
||||
|
@ -1732,9 +1747,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
state std::vector<Future<Void>> recoveryComplete;
|
||||
|
||||
if(region.satelliteTLogReplicationFactor > 0) {
|
||||
std::vector<Tag> satelliteTags;
|
||||
satelliteTags.push_back(txsTag);
|
||||
|
||||
state vector<Future<TLogInterface>> satelliteInitializationReplies;
|
||||
vector< InitializeTLogRequest > sreqs( recr.satelliteTLogs.size() );
|
||||
|
||||
for( int i = 0; i < recr.satelliteTLogs.size(); i++ ) {
|
||||
InitializeTLogRequest &req = sreqs[i];
|
||||
req.recruitmentID = logSystem->recruitmentID;
|
||||
|
@ -1746,7 +1763,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
req.locality = tagLocalityInvalid;
|
||||
req.remoteTag = Tag();
|
||||
req.isPrimary = true;
|
||||
req.allTags = allTags;
|
||||
req.allTags = satelliteTags;
|
||||
req.startVersion = oldLogSystem->knownCommittedVersion + 1;
|
||||
req.logRouterTags = logSystem->logRouterTags;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue