diff --git a/documentation/tutorial/tutorial.actor.cpp b/documentation/tutorial/tutorial.actor.cpp index 87fee7f2ce..cb2f20e239 100644 --- a/documentation/tutorial/tutorial.actor.cpp +++ b/documentation/tutorial/tutorial.actor.cpp @@ -153,7 +153,7 @@ struct StreamReply : ReplyPromiseStreamReply { template void serialize(Ar& ar) { - serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, index); + serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, index); } }; diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index ccae14168e..fdc37287a5 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -313,7 +313,14 @@ struct GetKeyValuesStreamReply : public ReplyPromiseStreamReply { template void serialize(Ar& ar) { - serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, data, version, more, cached, arena); + serializer(ar, + ReplyPromiseStreamReply::acknowledgeToken, + ReplyPromiseStreamReply::sequence, + data, + version, + more, + cached, + arena); } }; diff --git a/fdbrpc/fdbrpc.h b/fdbrpc/fdbrpc.h index 31711071bf..835a67a93a 100644 --- a/fdbrpc/fdbrpc.h +++ b/fdbrpc/fdbrpc.h @@ -255,6 +255,7 @@ void setReplyPriority(const ReplyPromise& p, TaskPriority taskID) { struct ReplyPromiseStreamReply { Optional acknowledgeToken; + uint16_t sequence; ReplyPromiseStreamReply() {} }; @@ -277,15 +278,15 @@ struct AcknowledgementReceiver final : FlowReceiver, FastAllocated::operator new; using FastAllocated::operator delete; - int64_t bytesSent; - int64_t bytesAcknowledged; - int64_t bytesLimit; + uint16_t sequence = 0; + int64_t bytesSent = 0; + int64_t bytesAcknowledged = 0; + int64_t bytesLimit = 0; Promise ready; Future failures; - AcknowledgementReceiver() : bytesSent(0), bytesAcknowledged(0), bytesLimit(0), ready(nullptr) {} - AcknowledgementReceiver(const Endpoint& remoteEndpoint) - : FlowReceiver(remoteEndpoint, false), bytesSent(0), bytesAcknowledged(0), bytesLimit(0), ready(nullptr) {} + AcknowledgementReceiver() : ready(nullptr) {} + AcknowledgementReceiver(const Endpoint& remoteEndpoint) : FlowReceiver(remoteEndpoint, false), ready(nullptr) {} void receive(ArenaObjectReader& reader) override { ErrorOr message; @@ -353,20 +354,29 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue, acknowledgements = AcknowledgementReceiver( FlowTransport::transport().loadedEndpoint(message.get().asUnderlyingType().acknowledgeToken.get())); } - if (this->shouldFireImmediately()) { - // This message is going to be consumed by the client immediately (and therefore will not call pop()) so - // send an ack immediately - if (acknowledgements.getRawEndpoint().isValid()) { - acknowledgements.bytesAcknowledged += message.get().asUnderlyingType().expectedSize(); - FlowTransport::transport().sendUnreliable( - SerializeSource>( - AcknowledgementReply(acknowledgements.bytesAcknowledged)), - acknowledgements.getEndpoint(TaskPriority::ReadSocket), - false); + if (acknowledgements.sequence != message.get().asUnderlyingType().sequence) { + TraceEvent(SevError, "StreamSequenceMismatch") + .detail("Expected", acknowledgements.sequence) + .detail("Actual", message.get().asUnderlyingType().sequence); + ASSERT_WE_THINK(false); + this->sendError(connection_failed()); + } else { + acknowledgements.sequence++; + if (this->shouldFireImmediately()) { + // This message is going to be consumed by the client immediately (and therefore will not call + // pop()) so send an ack immediately + if (acknowledgements.getRawEndpoint().isValid()) { + acknowledgements.bytesAcknowledged += message.get().asUnderlyingType().expectedSize(); + FlowTransport::transport().sendUnreliable( + SerializeSource>( + AcknowledgementReply(acknowledgements.bytesAcknowledged)), + acknowledgements.getEndpoint(TaskPriority::ReadSocket), + false); + } } - } - this->send(std::move(message.get().asUnderlyingType())); + this->send(std::move(message.get().asUnderlyingType())); + } } this->delPromiseRef(); } @@ -420,6 +430,7 @@ public: // register acknowledge receiver on sender and tell the receiver where to send acknowledge messages value.acknowledgeToken = queue->acknowledgements.getEndpoint(TaskPriority::ReadSocket).token; } + value.sequence = queue->acknowledgements.sequence++; queue->acknowledgements.bytesSent += value.expectedSize(); FlowTransport::transport().sendUnreliable( SerializeSource>>(value), getEndpoint(), false); diff --git a/fdbserver/NetworkTest.h b/fdbserver/NetworkTest.h index a6aa60708b..5106167b33 100644 --- a/fdbserver/NetworkTest.h +++ b/fdbserver/NetworkTest.h @@ -68,7 +68,7 @@ struct NetworkTestStreamingReply : ReplyPromiseStreamReply { template void serialize(Ar& ar) { - serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, index); + serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, index); } }; diff --git a/fdbserver/TLogInterface.h b/fdbserver/TLogInterface.h index 1ca049927d..e51e88ad30 100644 --- a/fdbserver/TLogInterface.h +++ b/fdbserver/TLogInterface.h @@ -226,7 +226,7 @@ struct TLogPeekStreamReply : public ReplyPromiseStreamReply { template void serialize(Ar& ar) { - serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, rep); + serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, rep); } };