diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 37e2abb827..3590f2eb81 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -7282,6 +7282,7 @@ ACTOR Future mergeChangeFeedStream(Reference db, *begin = checkVersion + 1; ASSERT(nextOut.back().version >= results->lastReturnedVersion.get()); results->mutations.send(nextOut); + wait(results->mutations.onEmpty()); if (nextOut.back().version > results->lastReturnedVersion.get()) { results->lastReturnedVersion.set(nextOut.back().version); } @@ -7311,6 +7312,7 @@ ACTOR Future mergeChangeFeedStream(Reference db, if (nextOut.size()) { ASSERT(nextOut.back().version >= results->lastReturnedVersion.get()); results->mutations.send(nextOut); + wait(results->mutations.onEmpty()); if (nextOut.back().version > results->lastReturnedVersion.get()) { results->lastReturnedVersion.set(nextOut.back().version); } @@ -7455,32 +7457,33 @@ ACTOR Future getChangeFeedStreamActor(Reference db, wait(results->mutations.onEmpty()); choose { when(wait(cx->connectionFileChanged())) { break; } - when(ChangeFeedStreamReply rep = waitNext(results->streams[0].getFuture())) { - begin = rep.mutations.back().version + 1; + when(state ChangeFeedStreamReply feedReply = waitNext(results->streams[0].getFuture())) { + begin = feedReply.mutations.back().version + 1; // TODO REMOVE, for debugging - if (rep.mutations.back().version < results->lastReturnedVersion.get()) { + if (feedReply.mutations.back().version < results->lastReturnedVersion.get()) { printf("out of order mutation for CF %s from (%d) %s! %lld < %lld\n", rangeID.toString().substr(0, 6).c_str(), results->storageData.size(), results->storageData.empty() ? "????" : results->storageData[0]->id.toString().substr(0, 4).c_str(), - rep.mutations.back().version, + feedReply.mutations.back().version, results->lastReturnedVersion.get()); } - ASSERT(rep.mutations.back().version >= results->lastReturnedVersion.get()); + ASSERT(feedReply.mutations.back().version >= results->lastReturnedVersion.get()); results->mutations.send( - Standalone>(rep.mutations, rep.arena)); - if (rep.mutations.back().version > results->lastReturnedVersion.get()) { - results->lastReturnedVersion.set(rep.mutations.back().version); + Standalone>(feedReply.mutations, feedReply.arena)); + wait(results->mutations.onEmpty()); + if (feedReply.mutations.back().version > results->lastReturnedVersion.get()) { + results->lastReturnedVersion.set(feedReply.mutations.back().version); } - if (!atLatest && rep.atLatestVersion) { + if (!atLatest && feedReply.atLatestVersion) { atLatest = true; results->notAtLatest.set(0); } - if (rep.minStreamVersion > results->storageData[0]->version.get()) { - results->storageData[0]->version.set(rep.minStreamVersion); + if (feedReply.minStreamVersion > results->storageData[0]->version.get()) { + results->storageData[0]->version.set(feedReply.minStreamVersion); } } } diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 68ecdf4c49..e5e1295154 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -3929,7 +3929,8 @@ ACTOR Future fetchChangeFeedApplier(StorageServer* data, } remoteLoc++; } else if (remoteResult[remoteLoc].version == localResult.version) { - if (remoteResult[remoteLoc].mutations.size()) { + if (remoteResult[remoteLoc].mutations.size() && + remoteResult[remoteLoc].mutations.back().param1 != lastEpochEndPrivateKey) { ASSERT(localResult.mutations.size()); remoteResult[remoteLoc].mutations.append( remoteResult.arena(), localResult.mutations.begin(), localResult.mutations.size());