fix: sending to mutations can cause whenAtLeast to update lastReturnedVersion before we have a chance to

This commit is contained in:
Evan Tschannen 2021-12-05 16:16:20 -08:00
parent 13ef5afb9c
commit 02c650c6e7
1 changed files with 12 additions and 3 deletions

View File

@ -7269,8 +7269,11 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
if (nextStream.next.version != checkVersion) {
if (nextOut.size()) {
*begin = checkVersion + 1;
ASSERT(nextOut.back().version >= results->lastReturnedVersion.get());
results->mutations.send(nextOut);
results->lastReturnedVersion.set(nextOut.back().version);
if (nextOut.back().version > results->lastReturnedVersion.get()) {
results->lastReturnedVersion.set(nextOut.back().version);
}
nextOut = Standalone<VectorRef<MutationsAndVersionRef>>();
}
checkVersion = nextStream.next.version;
@ -7295,8 +7298,11 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
}
}
if (nextOut.size()) {
ASSERT(nextOut.back().version >= results->lastReturnedVersion.get());
results->mutations.send(nextOut);
results->lastReturnedVersion.set(nextOut.back().version);
if (nextOut.back().version > results->lastReturnedVersion.get()) {
results->lastReturnedVersion.set(nextOut.back().version);
}
}
throw end_of_stream();
}
@ -7437,9 +7443,12 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
when(wait(cx->connectionFileChanged())) { break; }
when(ChangeFeedStreamReply rep = waitNext(replyStream.getFuture())) {
begin = rep.mutations.back().version + 1;
ASSERT(rep.mutations.back().version >= results->lastReturnedVersion.get());
results->mutations.send(
Standalone<VectorRef<MutationsAndVersionRef>>(rep.mutations, rep.arena));
results->lastReturnedVersion.set(rep.mutations.back().version);
if (rep.mutations.back().version > results->lastReturnedVersion.get()) {
results->lastReturnedVersion.set(rep.mutations.back().version);
}
if (!atLatest && rep.atLatestVersion) {
atLatest = true;
results->notAtLatest.set(0);