When reloading one cursor in a merge cursor, top off the other cursors as well.
This commit is contained in:
parent
99843bd4ba
commit
324289039a
|
@ -135,6 +135,8 @@ void ILogSystem::ServerPeekCursor::advanceTo(LogMessageVersion n) {
|
|||
|
||||
ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self, TaskPriority taskID ) {
|
||||
if( !self->interf || self->messageVersion >= self->end ) {
|
||||
if( self->hasMessage() )
|
||||
return Void();
|
||||
wait( Future<Void>(Never()));
|
||||
throw internal_error();
|
||||
}
|
||||
|
@ -154,6 +156,9 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
|
|||
return Void();
|
||||
}
|
||||
|
||||
if( self->hasMessage() )
|
||||
return Void();
|
||||
|
||||
choose {
|
||||
when( TLogPeekReply res = wait( self->interf->get().present() ? self->futureResults.front() : Never() ) ) {
|
||||
if(res.begin.get() != expectedBegin) {
|
||||
|
@ -236,7 +241,7 @@ ACTOR Future<Void> serverPeekGetMore( ILogSystem::ServerPeekCursor* self, TaskPr
|
|||
|
||||
Future<Void> ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) {
|
||||
//TraceEvent("SPC_GetMore", randomID).detail("HasMessage", hasMessage()).detail("More", !more.isValid() || more.isReady()).detail("MessageVersion", messageVersion.toString()).detail("End", end.toString());
|
||||
if( hasMessage() )
|
||||
if( hasMessage() && !parallelGetMore )
|
||||
return Void();
|
||||
if( !more.isValid() || more.isReady() ) {
|
||||
if (parallelGetMore || onlySpilled || futureResults.size()) {
|
||||
|
|
Loading…
Reference in New Issue