Fix: several empty version issues with large single-version change feed entries

This commit is contained in:
Josh Slocum 2021-12-02 09:51:24 -06:00
parent 0b9505c52a
commit 85c6e30aec
2 changed files with 27 additions and 5 deletions

View File

@ -6897,6 +6897,15 @@ ACTOR Future<Void> singleChangeFeedStream(StorageServerInterface interf,
if (rep.mutations[resultLoc].version >= nextVersion) {
results.send(rep.mutations[resultLoc]);
} else {
// TODO REMOVE eventually, useful for debugging for now
if (!rep.mutations[resultLoc].mutations.empty()) {
printf(
"non-empty mutations (%d), but versions out of order from %s! mv=%lld, nv=%lld\n",
rep.mutations.size(),
interf.id().toString().substr(0, 4).c_str(),
rep.mutations[resultLoc].version,
nextVersion);
}
ASSERT(rep.mutations[resultLoc].mutations.empty());
}
resultLoc++;
@ -6930,6 +6939,8 @@ ACTOR Future<Void> singleChangeFeedStream(StorageServerInterface interf,
}
}
} catch (Error& e) {
// TODO REMOVE eventually, useful for debugging for now
printf("NAS: CFError %s\n", e.name());
if (e.code() == error_code_actor_cancelled) {
throw;
}

View File

@ -1740,6 +1740,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
state ChangeFeedStreamReply memoryReply;
state int remainingLimitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
state int remainingDurableBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
state Version startVersion = data->version.get();
if (data->version.get() < req.begin) {
wait(data->version.whenAtLeast(req.begin));
@ -1835,7 +1836,10 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
}
}
return std::make_pair(reply, remainingLimitBytes > 0 && remainingDurableBytes > 0);
// If the SS's version advanced at all during any of the waits, the read from memory may have missed some mutations,
// so gotAll can only be true if data->version didn't change over the course of this actor
return std::make_pair(reply,
remainingLimitBytes > 0 && remainingDurableBytes > 0 && data->version.get() == startVersion);
}
ACTOR Future<Void> localChangeFeedStream(StorageServer* data,
@ -1887,7 +1891,7 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
try {
loop {
Future<Void> onReady = req.reply.onReady();
if (atLatest && !onReady.isReady()) {
if (atLatest && !onReady.isReady() && !removeUID) {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][streamUID] =
blockedVersion.present() ? blockedVersion.get() : data->prevVersion;
removeUID = true;
@ -1908,18 +1912,25 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
if (!atLatest && gotAll) {
atLatest = true;
}
auto& clientVersions = data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()];
Version minVersion = removeUID ? data->version.get() : data->prevVersion;
if (removeUID) {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()].erase(streamUID);
removeUID = false;
if (gotAll) {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()].erase(streamUID);
removeUID = false;
} else {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][streamUID] =
feedReply.mutations.back().version;
}
}
for (auto& it : clientVersions) {
minVersion = std::min(minVersion, it.second);
}
feedReply.atLatestVersion = atLatest;
feedReply.minStreamVersion = gotAll ? minVersion : feedReply.mutations.back().version;
feedReply.minStreamVersion = minVersion;
req.reply.send(feedReply);
if (feedReply.mutations.back().version == req.end - 1) {
req.reply.sendError(end_of_stream());