added a sequence number to streaming replies as a safety backstop against out of order delivery

This commit is contained in:
Evan Tschannen 2021-09-15 23:54:38 -07:00
parent 4f14e08547
commit 47ccc75270
5 changed files with 40 additions and 22 deletions

View File

@ -153,7 +153,7 @@ struct StreamReply : ReplyPromiseStreamReply {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, index);
serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, index);
}
};

View File

@ -313,7 +313,14 @@ struct GetKeyValuesStreamReply : public ReplyPromiseStreamReply {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, data, version, more, cached, arena);
serializer(ar,
ReplyPromiseStreamReply::acknowledgeToken,
ReplyPromiseStreamReply::sequence,
data,
version,
more,
cached,
arena);
}
};

View File

@ -255,6 +255,7 @@ void setReplyPriority(const ReplyPromise<Reply>& p, TaskPriority taskID) {
struct ReplyPromiseStreamReply {
Optional<UID> acknowledgeToken;
uint16_t sequence;
ReplyPromiseStreamReply() {}
};
@ -277,15 +278,15 @@ struct AcknowledgementReceiver final : FlowReceiver, FastAllocated<Acknowledgeme
using FastAllocated<AcknowledgementReceiver>::operator new;
using FastAllocated<AcknowledgementReceiver>::operator delete;
int64_t bytesSent;
int64_t bytesAcknowledged;
int64_t bytesLimit;
uint16_t sequence = 0;
int64_t bytesSent = 0;
int64_t bytesAcknowledged = 0;
int64_t bytesLimit = 0;
Promise<Void> ready;
Future<Void> failures;
AcknowledgementReceiver() : bytesSent(0), bytesAcknowledged(0), bytesLimit(0), ready(nullptr) {}
AcknowledgementReceiver(const Endpoint& remoteEndpoint)
: FlowReceiver(remoteEndpoint, false), bytesSent(0), bytesAcknowledged(0), bytesLimit(0), ready(nullptr) {}
AcknowledgementReceiver() : ready(nullptr) {}
AcknowledgementReceiver(const Endpoint& remoteEndpoint) : FlowReceiver(remoteEndpoint, false), ready(nullptr) {}
void receive(ArenaObjectReader& reader) override {
ErrorOr<AcknowledgementReply> message;
@ -353,20 +354,29 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
acknowledgements = AcknowledgementReceiver(
FlowTransport::transport().loadedEndpoint(message.get().asUnderlyingType().acknowledgeToken.get()));
}
if (this->shouldFireImmediately()) {
// This message is going to be consumed by the client immediately (and therefore will not call pop()) so
// send an ack immediately
if (acknowledgements.getRawEndpoint().isValid()) {
acknowledgements.bytesAcknowledged += message.get().asUnderlyingType().expectedSize();
FlowTransport::transport().sendUnreliable(
SerializeSource<ErrorOr<AcknowledgementReply>>(
AcknowledgementReply(acknowledgements.bytesAcknowledged)),
acknowledgements.getEndpoint(TaskPriority::ReadSocket),
false);
if (acknowledgements.sequence != message.get().asUnderlyingType().sequence) {
TraceEvent(SevError, "StreamSequenceMismatch")
.detail("Expected", acknowledgements.sequence)
.detail("Actual", message.get().asUnderlyingType().sequence);
ASSERT_WE_THINK(false);
this->sendError(connection_failed());
} else {
acknowledgements.sequence++;
if (this->shouldFireImmediately()) {
// This message is going to be consumed by the client immediately (and therefore will not call
// pop()) so send an ack immediately
if (acknowledgements.getRawEndpoint().isValid()) {
acknowledgements.bytesAcknowledged += message.get().asUnderlyingType().expectedSize();
FlowTransport::transport().sendUnreliable(
SerializeSource<ErrorOr<AcknowledgementReply>>(
AcknowledgementReply(acknowledgements.bytesAcknowledged)),
acknowledgements.getEndpoint(TaskPriority::ReadSocket),
false);
}
}
}
this->send(std::move(message.get().asUnderlyingType()));
this->send(std::move(message.get().asUnderlyingType()));
}
}
this->delPromiseRef();
}
@ -420,6 +430,7 @@ public:
// register acknowledge receiver on sender and tell the receiver where to send acknowledge messages
value.acknowledgeToken = queue->acknowledgements.getEndpoint(TaskPriority::ReadSocket).token;
}
value.sequence = queue->acknowledgements.sequence++;
queue->acknowledgements.bytesSent += value.expectedSize();
FlowTransport::transport().sendUnreliable(
SerializeSource<ErrorOr<EnsureTable<T>>>(value), getEndpoint(), false);

View File

@ -68,7 +68,7 @@ struct NetworkTestStreamingReply : ReplyPromiseStreamReply {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, index);
serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, index);
}
};

View File

@ -226,7 +226,7 @@ struct TLogPeekStreamReply : public ReplyPromiseStreamReply {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, rep);
serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, rep);
}
};