diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 1424cb6bd5..7944391f44 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -525,11 +525,6 @@ ACTOR static Future connectionReader(TransportData* transport, Promise> onConnected); static void sendLocal(TransportData* self, ISerializeSource const& what, const Endpoint& destination); -static void writePacket(PacketBuffer* pb, - PacketWriter& wr, - Endpoint destination, - ISerializeSource const& what, - bool& warnAlwaysForLargePacket); static ReliablePacket* sendPacket(TransportData* self, Reference peer, ISerializeSource const& what, @@ -950,7 +945,8 @@ void Peer::prependConnectPacket() { pkt.protocolVersion.addObjectSerializerFlag(); pkt.connectionId = transport->transportId; - PacketBuffer* pb_first = PacketBuffer::create(); + PacketBuffer* pb_first = PacketBuffer::create(), *pb_end = nullptr; + auto checksumPb = pb_first; PacketWriter wr(pb_first, nullptr, Unversioned()); pkt.serialize(wr); if (!transport->tokens.empty()) { @@ -958,14 +954,88 @@ void Peer::prependConnectPacket() { for (auto t : transport->tokens) { req.tokens.push_back(req.arena, t.second); } + SerializeSource what(req); ++transport->countPacketsGenerated; - writePacket(pb_first, - wr, - Endpoint::wellKnown({ destination }, WLTOKEN_AUTH_TENANT), - SerializeSource(req), - transport->warnAlwaysForLargePacket); + SplitBuffer packetInfoBuffer; + uint32_t len; + int packetInfoSize = PACKET_LEN_WIDTH; + XXH64_hash_t checksum; + XXH3_state_t checksumState; + bool checksumStream = false; + const bool checksumEnabled = !destination.isTLS(); + + if (checksumEnabled) { + packetInfoSize += sizeof(checksum); + } + wr.writeAhead(packetInfoSize, &packetInfoBuffer); + wr << Endpoint::wellKnownToken(WLTOKEN_AUTH_TENANT); + what.serializePacketWriter(wr); + pb_end = wr.finish(); + len = wr.size() - packetInfoSize - pkt.totalPacketSize(); + if (checksumEnabled) { + // Find the correct place to start calculating checksum + uint32_t checksumUnprocessedLength = len; + int prevBytesWritten = pkt.totalPacketSize(); + prevBytesWritten += packetInfoSize; + if (prevBytesWritten >= checksumPb->bytes_written) { + prevBytesWritten -= checksumPb->bytes_written; + checksumPb = checksumPb->nextPacketBuffer(); + } + + // Checksum calculation + while (checksumUnprocessedLength > 0) { + uint32_t processLength = + std::min(checksumUnprocessedLength, (uint32_t)(checksumPb->bytes_written - prevBytesWritten)); + + // If not in checksum stream mode yet + if (!checksumStream) { + // If there is nothing left to process then calculate checksum directly + if (processLength == checksumUnprocessedLength) { + checksum = XXH3_64bits(checksumPb->data() + prevBytesWritten, processLength); + } else { + // Otherwise, initialize checksum state and switch to stream mode + if (XXH3_64bits_reset(&checksumState) != XXH_OK) { + throw internal_error(); + } + checksumStream = true; + } + } + + // If in checksum stream mode, update the checksum state + if (checksumStream) { + if (XXH3_64bits_update(&checksumState, checksumPb->data() + prevBytesWritten, processLength) != + XXH_OK) { + throw internal_error(); + } + } + + checksumUnprocessedLength -= processLength; + checksumPb = checksumPb->nextPacketBuffer(); + prevBytesWritten = 0; + } + + // If in checksum stream mode, get the final checksum + if (checksumStream) { + checksum = XXH3_64bits_digest(&checksumState); + } + } + packetInfoBuffer.write(&len, sizeof(len)); + if (checksumEnabled) { + packetInfoBuffer.write(&checksum, sizeof(checksum), sizeof(len)); + } + } else { + pb_end = wr.finish(); } - unsent.prependWriteBuffer(pb_first, wr.finish()); +#if VALGRIND + SendBuffer* checkbuf = pb_first; + while (checkbuf) { + int size = checkbuf->bytes_written; + const uint8_t* data = checkbuf->data(); + VALGRIND_CHECK_MEM_IS_DEFINED(data, size); + checkbuf = checkbuf->next; + } +#endif + unsent.prependWriteBuffer(pb_first, pb_end); } void Peer::discardUnreliablePackets() { @@ -1763,14 +1833,34 @@ static void sendLocal(TransportData* self, ISerializeSource const& what, const E } } -static void writePacket(PacketBuffer* pb, - PacketWriter& wr, - Endpoint destination, - ISerializeSource const& what, - bool& warnAlwaysForLargePacket) { +static ReliablePacket* sendPacket(TransportData* self, + Reference peer, + ISerializeSource const& what, + const Endpoint& destination, + bool reliable) { const bool checksumEnabled = !destination.getPrimaryAddress().isTLS(); + ++self->countPacketsGenerated; + + // If there isn't an open connection, a public address, or the peer isn't compatible, we can't send + if (!peer || (peer->outgoingConnectionIdle && !destination.getPrimaryAddress().isPublic()) || + (peer->incompatibleProtocolVersionNewer && + destination.token != Endpoint::wellKnownToken(WLTOKEN_PING_PACKET))) { + TEST(true); // Can't send to private address without a compatible open connection + return nullptr; + } + + bool firstUnsent = peer->unsent.empty(); + + PacketBuffer* pb = peer->unsent.getWriteBuffer(); + ReliablePacket* rp = reliable ? new ReliablePacket : 0; + int prevBytesWritten = pb->bytes_written; PacketBuffer* checksumPb = pb; + + PacketWriter wr(pb, + rp, + AssumeVersion(g_network->protocolVersion())); // SOMEDAY: Can we downgrade to talk to older peers? + // Reserve some space for packet length and checksum, write them after serializing data SplitBuffer packetInfoBuffer; uint32_t len; @@ -1852,7 +1942,7 @@ static void writePacket(PacketBuffer* pb, .detail("Length", (int)len); // throw platform_error(); // FIXME: How to recover from this situation? } else if (len > FLOW_KNOBS->PACKET_WARNING) { - TraceEvent(warnAlwaysForLargePacket ? SevWarnAlways : SevWarn, "LargePacketSent") + TraceEvent(self->warnAlwaysForLargePacket ? SevWarnAlways : SevWarn, "LargePacketSent") .suppressFor(1.0) .detail("ToPeer", destination.getPrimaryAddress()) .detail("Length", (int)len) @@ -1860,7 +1950,7 @@ static void writePacket(PacketBuffer* pb, .backtrace(); if (g_network->isSimulated()) - warnAlwaysForLargePacket = false; + self->warnAlwaysForLargePacket = false; } #if VALGRIND @@ -1872,33 +1962,6 @@ static void writePacket(PacketBuffer* pb, checkbuf = checkbuf->next; } #endif -} - -static ReliablePacket* sendPacket(TransportData* self, - Reference peer, - ISerializeSource const& what, - const Endpoint& destination, - bool reliable) { - ++self->countPacketsGenerated; - - // If there isn't an open connection, a public address, or the peer isn't compatible, we can't send - if (!peer || (peer->outgoingConnectionIdle && !destination.getPrimaryAddress().isPublic()) || - (peer->incompatibleProtocolVersionNewer && - destination.token != Endpoint::wellKnownToken(WLTOKEN_PING_PACKET))) { - TEST(true); // Can't send to private address without a compatible open connection - return nullptr; - } - - bool firstUnsent = peer->unsent.empty(); - - PacketBuffer* pb = peer->unsent.getWriteBuffer(); - ReliablePacket* rp = reliable ? new ReliablePacket : 0; - - PacketWriter wr(pb, - rp, - AssumeVersion(g_network->protocolVersion())); // SOMEDAY: Can we downgrade to talk to older peers? - - writePacket(pb, wr, destination, what, self->warnAlwaysForLargePacket); peer->send(pb, rp, firstUnsent); if (destination.token != Endpoint::wellKnownToken(WLTOKEN_PING_PACKET)) {