Updating SS desired every mutation to avoid CF merge deadlock

This commit is contained in:
Josh Slocum 2022-03-10 20:11:37 -06:00
parent 479ac313ca
commit 2a506f2dff
1 changed files with 10 additions and 10 deletions

View File

@ -7608,33 +7608,33 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
wait(results.onEmpty());
if (rep.mutations[resultLoc].version >= nextVersion) {
results.send(rep.mutations[resultLoc]);
// check refresh.canBeSet so that, if we are killed after calling one of these callbacks, we
// just skip to the next wait and get actor_cancelled
// FIXME: this is somewhat expensive to do every mutation.
for (auto& it : feedData->storageData) {
if (refresh.canBeSet() && rep.mutations[resultLoc].version > it->desired.get()) {
it->desired.set(rep.mutations[resultLoc].version);
}
}
} else {
ASSERT(rep.mutations[resultLoc].mutations.empty());
}
resultLoc++;
}
// if we got the empty version that went backwards, don't decrease nextVersion
if (rep.mutations.back().version + 1 > nextVersion) {
nextVersion = rep.mutations.back().version + 1;
}
// check refresh.canBeSet so that, if we are killed after calling one of these callbacks, we just
// skip to the next wait and get actor_cancelled
if (refresh.canBeSet() && !atLatestVersion && rep.atLatestVersion) {
atLatestVersion = true;
feedData->notAtLatest.set(feedData->notAtLatest.get() - 1);
}
if (refresh.canBeSet() && rep.minStreamVersion > storageData->version.get()) {
storageData->version.set(rep.minStreamVersion);
}
for (auto& it : feedData->storageData) {
if (refresh.canBeSet() && rep.mutations.back().version > it->desired.get()) {
it->desired.set(rep.mutations.back().version);
}
}
}
when(wait(atLatestVersion && replyStream.isEmpty() && results.isEmpty()
? storageData->version.whenAtLeast(nextVersion)