fix: parallel get more needs to verify the begin version matches the end of the previous request, because when a peek cursor expires we lose all history, so the same sequence number could start at different versions

This commit is contained in:
Evan Tschannen 2018-06-25 11:15:49 -07:00
parent 398497f5c3
commit 2ec8744ab3
4 changed files with 10 additions and 1 deletions

View File

@ -143,6 +143,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
}
loop {
state Version expectedBegin = self->messageVersion.version;
try {
while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) {
self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, std::make_pair(self->randomID, self->sequence++)), taskID) ) );
@ -150,6 +151,10 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
choose {
when( TLogPeekReply res = wait( self->interf->get().present() ? self->futureResults.front() : Never() ) ) {
if(res.begin.get() != expectedBegin) {
throw timed_out();
}
expectedBegin = res.end;
self->futureResults.pop_front();
self->results = res;
if(res.popped.present())

View File

@ -975,6 +975,7 @@ namespace oldTLog {
} else {
sequenceData.send(reply.end);
}
reply.begin = req.begin;
}
req.reply.send( reply );

View File

@ -140,10 +140,11 @@ struct TLogPeekReply {
Optional<Version> popped;
Version maxKnownVersion;
Version minKnownCommittedVersion;
Optional<Version> begin;
template <class Ar>
void serialize(Ar& ar) {
ar & arena & messages & end & popped & maxKnownVersion & minKnownCommittedVersion;
ar & arena & messages & end & popped & maxKnownVersion & minKnownCommittedVersion & begin;
}
};

View File

@ -1007,6 +1007,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
} else {
sequenceData.send(rep.end);
}
rep.begin = req.begin;
}
req.reply.send( rep );
@ -1068,6 +1069,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
} else {
sequenceData.send(reply.end);
}
reply.begin = req.begin;
}
req.reply.send( reply );