fix: peek all possible txsTags which could have been used by old log sets

This commit is contained in:
Evan Tschannen 2019-06-27 23:39:19 -07:00
parent 235697f688
commit 2113d6d01e
1 changed files with 25 additions and 8 deletions

View File

@ -746,9 +746,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
TraceEvent("TLogPeekTxs", dbgid).detail("Begin", begin).detail("End", end).detail("LocalEnd", localEnd).detail("PeekLocality", peekLocality);
int maxTxsTags = tLogs[0]->logServers.size();
for(auto& it : oldLogData) {
maxTxsTags = std::max<int>(maxTxsTags, it.tLogs[0]->logServers.size());
}
if(peekLocality < 0 || localEnd == invalidVersion || localEnd <= begin) {
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
for(int i = 0; i < tLogs[0]->logServers.size(); i++) {
for(int i = 0; i < maxTxsTags; i++) {
cursors.push_back(peekAll(dbgid, begin, end, Tag(tagLocalityTxs, i), true));
}
//SOMEDAY: remove once upgrades from 6.2 are no longer supported
@ -760,7 +765,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
try {
if(localEnd >= end) {
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
for(int i = 0; i < tLogs[0]->logServers.size(); i++) {
for(int i = 0; i < maxTxsTags; i++) {
cursors.push_back(peekLocal(dbgid, Tag(tagLocalityTxs, i), begin, end, true, peekLocality));
}
//SOMEDAY: remove once upgrades from 6.2 are no longer supported
@ -776,7 +781,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::vector< Reference<ILogSystem::IPeekCursor> > localCursors;
std::vector< Reference<ILogSystem::IPeekCursor> > allCursors;
for(int i = 0; i < tLogs[0]->logServers.size(); i++) {
for(int i = 0; i < maxTxsTags; i++) {
localCursors.push_back(peekLocal(dbgid, Tag(tagLocalityTxs, i), begin, localEnd, true, peekLocality));
allCursors.push_back(peekAll(dbgid, localEnd, end, Tag(tagLocalityTxs, i), true));
}
@ -792,7 +797,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
} catch( Error& e ) {
if(e.code() == error_code_worker_removed) {
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
for(int i = 0; i < tLogs[0]->logServers.size(); i++) {
for(int i = 0; i < maxTxsTags; i++) {
cursors.push_back(peekAll(dbgid, begin, end, Tag(tagLocalityTxs, i), true));
}
//SOMEDAY: remove once upgrades from 6.2 are no longer supported
@ -1781,7 +1786,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
if(oldLogSystem->tLogs.size()) {
for(int i = -1; i < oldLogSystem->tLogs[0]->logServers.size(); i++) {
int maxTxsTags = oldLogSystem->tLogs[0]->logServers.size();
for(auto& it : oldLogSystem->oldLogData) {
maxTxsTags = std::max<int>(maxTxsTags, it.tLogs[0]->logServers.size());
}
for(int i = -1; i < maxTxsTags; i++) {
Tag tag = i==-1 ? txsTag : Tag(tagLocalityTxs, i);
locations.clear();
logSet->getPushLocations( vector<Tag>(1, tag), locations, 0 );
@ -1867,6 +1876,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state RegionInfo region = configuration.getRegion(recr.dcId);
state int maxTxsTags = 0;
if(oldLogSystem->tLogs.size()) {
maxTxsTags = oldLogSystem->tLogs[0]->logServers.size();
for(auto& it : oldLogSystem->oldLogData) {
maxTxsTags = std::max<int>(maxTxsTags, it.tLogs[0]->logServers.size());
}
}
if(region.satelliteTLogReplicationFactor > 0) {
logSystem->tLogs.emplace_back(new LogSet());
if(recr.satelliteFallback) {
@ -1891,7 +1908,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->tLogs[1]->logServers.resize( recr.satelliteTLogs.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size
logSystem->tLogs[1]->updateLocalitySet(logSystem->tLogs[1]->tLogLocalities);
logSystem->tLogs[1]->populateSatelliteTagLocations(logSystem->logRouterTags,oldLogSystem->logRouterTags,recr.tLogs.size(),oldLogSystem->tLogs.size() ? oldLogSystem->tLogs[0]->logServers.size() : 0);
logSystem->tLogs[1]->populateSatelliteTagLocations(logSystem->logRouterTags,oldLogSystem->logRouterTags,recr.tLogs.size(),maxTxsTags);
logSystem->expectedLogSets++;
}
@ -1972,7 +1989,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
reqs[ logSystem->tLogs[0]->bestLocationFor( tag ) ].recoverTags.push_back( tag );
}
if(oldLogSystem->tLogs.size()) {
for(int i = -1; i < oldLogSystem->tLogs[0]->logServers.size(); i++) {
for(int i = -1; i < maxTxsTags; i++) {
Tag tag = i==-1 ? txsTag : Tag(tagLocalityTxs, i);
locations.clear();
logSystem->tLogs[0]->getPushLocations( vector<Tag>(1, tag), locations, 0 );
@ -2022,7 +2039,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
sreqs[ loc ].recoverTags.push_back( tag );
}
if(oldLogSystem->tLogs.size()) {
for(int i = -1; i < oldLogSystem->tLogs[0]->logServers.size(); i++) {
for(int i = -1; i < maxTxsTags; i++) {
Tag tag = i==-1 ? txsTag : Tag(tagLocalityTxs, i);
locations.clear();
logSystem->tLogs[1]->getPushLocations( vector<Tag>(1, tag), locations, 0 );