fix: peek local should not call peek all, because it is possible to still peek from remote log sets after a special tag
This commit is contained in:
parent
7892da032f
commit
50f481b149
|
@ -417,7 +417,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
return minVersionWhenReady( waitForAll(quorumResults), allReplies);
|
||||
}
|
||||
|
||||
Reference<IPeekCursor> peekAll( UID dbgid, Version begin, Version end, Tag tag, bool parallelGetMore, bool throwIfDead ) {
|
||||
Reference<IPeekCursor> peekAll( UID dbgid, Version begin, Version end, Tag tag, bool parallelGetMore ) {
|
||||
int bestSet = 0;
|
||||
std::vector<Reference<LogSet>> localSets;
|
||||
Version lastBegin = 0;
|
||||
|
@ -458,11 +458,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
break;
|
||||
}
|
||||
TraceEvent("TLogPeekAllDead", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size());
|
||||
if(throwIfDead) {
|
||||
throw worker_removed();
|
||||
} else {
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
}
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
}
|
||||
|
||||
int bestOldSet = 0;
|
||||
|
@ -489,11 +485,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
i++;
|
||||
continue;
|
||||
}
|
||||
if(throwIfDead) {
|
||||
throw worker_removed();
|
||||
} else {
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
}
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
}
|
||||
if(thisSpecial) {
|
||||
foundSpecial = true;
|
||||
|
@ -587,7 +579,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
if(tag.locality == tagLocalityRemoteLog) {
|
||||
return peekRemote(dbgid, begin, tag, parallelGetMore);
|
||||
} else {
|
||||
return peekAll(dbgid, begin, getPeekEnd(), tag, parallelGetMore, false);
|
||||
return peekAll(dbgid, begin, getPeekEnd(), tag, parallelGetMore);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -672,35 +664,35 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
int bestOldSet = -1;
|
||||
logCount = 0;
|
||||
bool nextFoundSpecial = false;
|
||||
for(int t = 0; t < oldLogData[i].tLogs.size(); t++) {
|
||||
if(oldLogData[i].tLogs[t]->logServers.size() && oldLogData[i].tLogs[t]->locality != tagLocalitySatellite) {
|
||||
logCount++;
|
||||
}
|
||||
if(oldLogData[i].tLogs[t]->logServers.size() && (oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded || oldLogData[i].tLogs[t]->locality == peekLocality || peekLocality == tagLocalityUpgraded)) {
|
||||
if( oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded ) {
|
||||
foundSpecial = true;
|
||||
nextFoundSpecial = true;
|
||||
}
|
||||
if(foundSpecial && !oldLogData[i].tLogs[t]->isLocal) {
|
||||
TraceEvent("TLogPeekLocalRemoteBeforeSpecial", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size()).detail("Idx", i);
|
||||
throw worker_removed();
|
||||
}
|
||||
bestOldSet = t;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if(foundSpecial) {
|
||||
TraceEvent("TLogPeekLocalFoundSpecial", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end);
|
||||
cursors.push_back(peekAll(dbgid, begin, std::min(lastBegin, end), tag, useMergePeekCursors, !useMergePeekCursors));
|
||||
epochEnds.push_back(LogMessageVersion(std::min(lastBegin, end)));
|
||||
break;
|
||||
}
|
||||
|
||||
if(bestOldSet == -1) {
|
||||
if(oldLogData[i].logRouterTags == 0 || logCount > 1) {
|
||||
TraceEvent("TLogPeekLocalNoLogRouterTags", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size()).detail("Idx", i).detail("LogRouterTags", oldLogData[i].logRouterTags).detail("LogCount", logCount);
|
||||
TraceEvent("TLogPeekLocalNoBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size()).detail("Idx", i).detail("LogRouterTags", oldLogData[i].logRouterTags).detail("LogCount", logCount).detail("FoundSpecial", foundSpecial);
|
||||
if(oldLogData[i].logRouterTags == 0 || logCount > 1 || foundSpecial) {
|
||||
throw worker_removed();
|
||||
}
|
||||
i++;
|
||||
continue;
|
||||
}
|
||||
|
||||
foundSpecial = nextFoundSpecial;
|
||||
|
||||
Version thisBegin = std::max(oldLogData[i].tLogs[bestOldSet]->startVersion, begin);
|
||||
if(thisBegin < lastBegin) {
|
||||
if(thisBegin < end) {
|
||||
|
@ -723,7 +715,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
Version end = getEnd();
|
||||
TraceEvent("TLogPeekSpecial", dbgid).detail("Begin", begin).detail("End", end).detail("LocalEnd", localEnd).detail("PeekLocality", peekLocality);
|
||||
if(localEnd == invalidVersion || localEnd <= begin) {
|
||||
return peekAll(dbgid, begin, end, tag, true, false);
|
||||
return peekAll(dbgid, begin, end, tag, true);
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -736,13 +728,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
cursors.resize(2);
|
||||
cursors[1] = peekLocal(dbgid, tag, begin, localEnd, true, peekLocality);
|
||||
cursors[0] = peekAll(dbgid, localEnd, end, tag, true, false);
|
||||
cursors[0] = peekAll(dbgid, localEnd, end, tag, true);
|
||||
epochEnds.push_back(LogMessageVersion(localEnd));
|
||||
|
||||
return Reference<ILogSystem::MultiCursor>( new ILogSystem::MultiCursor(cursors, epochEnds) );
|
||||
} catch( Error& e ) {
|
||||
if(e.code() == error_code_worker_removed) {
|
||||
return peekAll(dbgid, begin, end, tag, true, false);
|
||||
return peekAll(dbgid, begin, end, tag, true);
|
||||
}
|
||||
throw;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue