fix: log router start version is based on the start version of the local logs

This commit is contained in:
Evan Tschannen 2018-04-12 18:14:23 -07:00
parent 3b7e4410cf
commit c589630e53
1 changed files with 10 additions and 17 deletions

View File

@ -415,18 +415,18 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(bestSet == -1) {
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
}
if(oldLogData.size() == 0 || begin >= tLogs[bestSet]->startVersion) {
if(oldLogData.size() == 0 || begin >= tLogs[0]->startVersion) {
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, begin, getPeekEnd(), false, std::vector<LocalityData>(), IRepPolicyRef(), 0 ) );
} else {
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
std::vector< LogMessageVersion > epochEnds;
cursors.push_back( Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, tLogs[bestSet]->startVersion, getPeekEnd(), false, std::vector<LocalityData>(), IRepPolicyRef(), 0 ) ) );
Version lastBegin = tLogs[bestSet]->startVersion;
cursors.push_back( Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, tLogs[0]->startVersion, getPeekEnd(), false, std::vector<LocalityData>(), IRepPolicyRef(), 0 ) ) );
Version lastBegin = tLogs[0]->startVersion;
for(int i = 0; i < oldLogData.size() && begin < lastBegin; i++) {
int bestOldSet = -1;
for(int t = 0; t < oldLogData[i].tLogs.size(); t++) {
if(oldLogData[i].tLogs[t]->logRouters.size()) {
ASSERT(bestSet == -1);
ASSERT(bestOldSet == -1);
bestOldSet = t;
}
}
@ -434,7 +434,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
}
Version thisBegin = std::max(oldLogData[i].tLogs[bestOldSet]->startVersion, begin);
Version thisBegin = std::max(oldLogData[i].tLogs[0]->startVersion, begin);
cursors.push_back( Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( oldLogData[i].tLogs[bestOldSet]->logRouters, -1, (int)oldLogData[i].tLogs[bestOldSet]->logRouters.size(), tag,
thisBegin, lastBegin, false, std::vector<LocalityData>(), IRepPolicyRef(), 0 ) ) );
epochEnds.push_back(LogMessageVersion(lastBegin));
@ -1355,21 +1355,18 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
bool found = false;
Version ver = 0;
for(auto& tLogs : self->tLogs) {
if(tLogs->locality == locality) {
found = true;
}
if(tLogs->isLocal && tLogs->logServers.size()) {
ver = std::max(startVersion, tLogs->startVersion);
}
tLogs->logRouters.clear();
}
if(!found) {
Reference<LogSet> newLogSet( new LogSet() );
newLogSet->locality = locality;
newLogSet->startVersion = ver;
newLogSet->startVersion = self->tLogs[0]->startVersion;
newLogSet->isLocal = false;
self->tLogs.push_back(newLogSet);
}
@ -1383,7 +1380,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.recoveryCount = recoveryCount;
req.routerTag = Tag(tagLocalityLogRouter, i);
req.logSet = logSet;
req.startVersion = ver;
req.startVersion = self->tLogs[0]->startVersion;
auto reply = transformErrors( throwErrorOr( workers[nextRouter].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() );
logRouterInitializationReplies.back().push_back( reply );
allReplies.push_back( reply );
@ -1399,21 +1396,17 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
bool found = false;
Version ver = 0;
for(auto& tLogs : old.tLogs) {
if(tLogs->locality == locality) {
found = true;
}
if(tLogs->isLocal && tLogs->logServers.size()) {
ver = std::max(startVersion, tLogs->startVersion);
}
tLogs->logRouters.clear();
}
if(!found) {
Reference<LogSet> newLogSet( new LogSet() );
newLogSet->locality = locality;
newLogSet->startVersion = ver;
newLogSet->startVersion = old.tLogs[0]->startVersion;
old.tLogs.push_back(newLogSet);
}
@ -1426,7 +1419,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.recoveryCount = recoveryCount;
req.routerTag = Tag(tagLocalityLogRouter, i);
req.logSet = logSet;
req.startVersion = ver;
req.startVersion = old.tLogs[0]->startVersion;
auto reply = transformErrors( throwErrorOr( workers[nextRouter].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() );
logRouterInitializationReplies.back().push_back( reply );
allReplies.push_back( reply );