diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 4c7c7cfa5e..b0bdf119cd 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -6897,6 +6897,15 @@ ACTOR Future singleChangeFeedStream(StorageServerInterface interf, if (rep.mutations[resultLoc].version >= nextVersion) { results.send(rep.mutations[resultLoc]); } else { + // TODO REMOVE eventually, useful for debugging for now + if (!rep.mutations[resultLoc].mutations.empty()) { + printf( + "non-empty mutations (%d), but versions out of order from %s! mv=%lld, nv=%lld\n", + rep.mutations.size(), + interf.id().toString().substr(0, 4).c_str(), + rep.mutations[resultLoc].version, + nextVersion); + } ASSERT(rep.mutations[resultLoc].mutations.empty()); } resultLoc++; @@ -6930,6 +6939,8 @@ ACTOR Future singleChangeFeedStream(StorageServerInterface interf, } } } catch (Error& e) { + // TODO REMOVE eventually, useful for debugging for now + printf("NAS: CFError %s\n", e.name()); if (e.code() == error_code_actor_cancelled) { throw; } diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 529d3041d6..cc48631ead 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -1740,6 +1740,7 @@ ACTOR Future> getChangeFeedMutations(Stor state ChangeFeedStreamReply memoryReply; state int remainingLimitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT; state int remainingDurableBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT; + state Version startVersion = data->version.get(); if (data->version.get() < req.begin) { wait(data->version.whenAtLeast(req.begin)); @@ -1835,7 +1836,10 @@ ACTOR Future> getChangeFeedMutations(Stor } } - return std::make_pair(reply, remainingLimitBytes > 0 && remainingDurableBytes > 0); + // If the SS's version advanced at all during any of the waits, the read from memory may have missed some mutations, + // so gotAll can only be true if data->version didn't change over the course of this actor + return std::make_pair(reply, + remainingLimitBytes > 0 && remainingDurableBytes > 0 && data->version.get() == startVersion); } ACTOR Future localChangeFeedStream(StorageServer* data, @@ -1887,7 +1891,7 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques try { loop { Future onReady = req.reply.onReady(); - if (atLatest && !onReady.isReady()) { + if (atLatest && !onReady.isReady() && !removeUID) { data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][streamUID] = blockedVersion.present() ? blockedVersion.get() : data->prevVersion; removeUID = true; @@ -1908,18 +1912,25 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques if (!atLatest && gotAll) { atLatest = true; } + auto& clientVersions = data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()]; Version minVersion = removeUID ? data->version.get() : data->prevVersion; if (removeUID) { - data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()].erase(streamUID); - removeUID = false; + if (gotAll) { + data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()].erase(streamUID); + removeUID = false; + } else { + data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][streamUID] = + feedReply.mutations.back().version; + } } for (auto& it : clientVersions) { minVersion = std::min(minVersion, it.second); } feedReply.atLatestVersion = atLatest; - feedReply.minStreamVersion = gotAll ? minVersion : feedReply.mutations.back().version; + feedReply.minStreamVersion = minVersion; + req.reply.send(feedReply); if (feedReply.mutations.back().version == req.end - 1) { req.reply.sendError(end_of_stream());