Move the check to a better location.

This way, we avoid some ID randomness, and also avoid the potential for
resetting the randomID and sequence without clearing out the future
vector.
This commit is contained in:
Alex Miller 2019-10-15 00:27:59 -07:00
parent 1cb311fcb8
commit 858e4e5900
1 changed files with 1 additions and 7 deletions

View File

@ -151,7 +151,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled, std::make_pair(self->randomID, self->sequence++)), taskID) ) ); self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled, std::make_pair(self->randomID, self->sequence++)), taskID) ) );
} }
if (self->sequence == std::numeric_limits<decltype(self->sequence)>::max()) { if (self->sequence == std::numeric_limits<decltype(self->sequence)>::max()) {
throw timed_out(); throw operation_obsolete();
} }
} else if (self->futureResults.size() == 1) { } else if (self->futureResults.size() == 1) {
self->randomID = deterministicRandom()->randomUniqueID(); self->randomID = deterministicRandom()->randomUniqueID();
@ -172,12 +172,6 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
self->futureResults.pop_front(); self->futureResults.pop_front();
self->results = res; self->results = res;
self->onlySpilled = res.onlySpilled; self->onlySpilled = res.onlySpilled;
if(!res.onlySpilled && !self->parallelGetMore) {
// After we burn though the rest of the futures, then we should use a new
// randomID and sequence the next time we return to doing parallel peeks.
self->randomID = deterministicRandom()->randomUniqueID();
self->sequence = 0;
}
if(res.popped.present()) if(res.popped.present())
self->poppedVersion = std::min( std::max(self->poppedVersion, res.popped.get()), self->end.version ); self->poppedVersion = std::min( std::max(self->poppedVersion, res.popped.get()), self->end.version );
self->rd = ArenaReader( self->results.arena, self->results.messages, Unversioned() ); self->rd = ArenaReader( self->results.arena, self->results.messages, Unversioned() );