Fix memory issue

This commit is contained in:
Markus Pilman 2022-04-21 14:12:16 -06:00
parent b61c9ab329
commit 1da1f8cc0f
1 changed files with 109 additions and 46 deletions

View File

@ -525,11 +525,6 @@ ACTOR static Future<Void> connectionReader(TransportData* transport,
Promise<Reference<struct Peer>> onConnected);
static void sendLocal(TransportData* self, ISerializeSource const& what, const Endpoint& destination);
static void writePacket(PacketBuffer* pb,
PacketWriter& wr,
Endpoint destination,
ISerializeSource const& what,
bool& warnAlwaysForLargePacket);
static ReliablePacket* sendPacket(TransportData* self,
Reference<Peer> peer,
ISerializeSource const& what,
@ -950,7 +945,8 @@ void Peer::prependConnectPacket() {
pkt.protocolVersion.addObjectSerializerFlag();
pkt.connectionId = transport->transportId;
PacketBuffer* pb_first = PacketBuffer::create();
PacketBuffer* pb_first = PacketBuffer::create(), *pb_end = nullptr;
auto checksumPb = pb_first;
PacketWriter wr(pb_first, nullptr, Unversioned());
pkt.serialize(wr);
if (!transport->tokens.empty()) {
@ -958,14 +954,88 @@ void Peer::prependConnectPacket() {
for (auto t : transport->tokens) {
req.tokens.push_back(req.arena, t.second);
}
SerializeSource<AuthorizationRequest> what(req);
++transport->countPacketsGenerated;
writePacket(pb_first,
wr,
Endpoint::wellKnown({ destination }, WLTOKEN_AUTH_TENANT),
SerializeSource<AuthorizationRequest>(req),
transport->warnAlwaysForLargePacket);
SplitBuffer packetInfoBuffer;
uint32_t len;
int packetInfoSize = PACKET_LEN_WIDTH;
XXH64_hash_t checksum;
XXH3_state_t checksumState;
bool checksumStream = false;
const bool checksumEnabled = !destination.isTLS();
if (checksumEnabled) {
packetInfoSize += sizeof(checksum);
}
wr.writeAhead(packetInfoSize, &packetInfoBuffer);
wr << Endpoint::wellKnownToken(WLTOKEN_AUTH_TENANT);
what.serializePacketWriter(wr);
pb_end = wr.finish();
len = wr.size() - packetInfoSize - pkt.totalPacketSize();
if (checksumEnabled) {
// Find the correct place to start calculating checksum
uint32_t checksumUnprocessedLength = len;
int prevBytesWritten = pkt.totalPacketSize();
prevBytesWritten += packetInfoSize;
if (prevBytesWritten >= checksumPb->bytes_written) {
prevBytesWritten -= checksumPb->bytes_written;
checksumPb = checksumPb->nextPacketBuffer();
}
// Checksum calculation
while (checksumUnprocessedLength > 0) {
uint32_t processLength =
std::min(checksumUnprocessedLength, (uint32_t)(checksumPb->bytes_written - prevBytesWritten));
// If not in checksum stream mode yet
if (!checksumStream) {
// If there is nothing left to process then calculate checksum directly
if (processLength == checksumUnprocessedLength) {
checksum = XXH3_64bits(checksumPb->data() + prevBytesWritten, processLength);
} else {
// Otherwise, initialize checksum state and switch to stream mode
if (XXH3_64bits_reset(&checksumState) != XXH_OK) {
throw internal_error();
}
checksumStream = true;
}
}
// If in checksum stream mode, update the checksum state
if (checksumStream) {
if (XXH3_64bits_update(&checksumState, checksumPb->data() + prevBytesWritten, processLength) !=
XXH_OK) {
throw internal_error();
}
}
checksumUnprocessedLength -= processLength;
checksumPb = checksumPb->nextPacketBuffer();
prevBytesWritten = 0;
}
// If in checksum stream mode, get the final checksum
if (checksumStream) {
checksum = XXH3_64bits_digest(&checksumState);
}
}
packetInfoBuffer.write(&len, sizeof(len));
if (checksumEnabled) {
packetInfoBuffer.write(&checksum, sizeof(checksum), sizeof(len));
}
} else {
pb_end = wr.finish();
}
unsent.prependWriteBuffer(pb_first, wr.finish());
#if VALGRIND
SendBuffer* checkbuf = pb_first;
while (checkbuf) {
int size = checkbuf->bytes_written;
const uint8_t* data = checkbuf->data();
VALGRIND_CHECK_MEM_IS_DEFINED(data, size);
checkbuf = checkbuf->next;
}
#endif
unsent.prependWriteBuffer(pb_first, pb_end);
}
void Peer::discardUnreliablePackets() {
@ -1763,14 +1833,34 @@ static void sendLocal(TransportData* self, ISerializeSource const& what, const E
}
}
static void writePacket(PacketBuffer* pb,
PacketWriter& wr,
Endpoint destination,
ISerializeSource const& what,
bool& warnAlwaysForLargePacket) {
static ReliablePacket* sendPacket(TransportData* self,
Reference<Peer> peer,
ISerializeSource const& what,
const Endpoint& destination,
bool reliable) {
const bool checksumEnabled = !destination.getPrimaryAddress().isTLS();
++self->countPacketsGenerated;
// If there isn't an open connection, a public address, or the peer isn't compatible, we can't send
if (!peer || (peer->outgoingConnectionIdle && !destination.getPrimaryAddress().isPublic()) ||
(peer->incompatibleProtocolVersionNewer &&
destination.token != Endpoint::wellKnownToken(WLTOKEN_PING_PACKET))) {
TEST(true); // Can't send to private address without a compatible open connection
return nullptr;
}
bool firstUnsent = peer->unsent.empty();
PacketBuffer* pb = peer->unsent.getWriteBuffer();
ReliablePacket* rp = reliable ? new ReliablePacket : 0;
int prevBytesWritten = pb->bytes_written;
PacketBuffer* checksumPb = pb;
PacketWriter wr(pb,
rp,
AssumeVersion(g_network->protocolVersion())); // SOMEDAY: Can we downgrade to talk to older peers?
// Reserve some space for packet length and checksum, write them after serializing data
SplitBuffer packetInfoBuffer;
uint32_t len;
@ -1852,7 +1942,7 @@ static void writePacket(PacketBuffer* pb,
.detail("Length", (int)len);
// throw platform_error(); // FIXME: How to recover from this situation?
} else if (len > FLOW_KNOBS->PACKET_WARNING) {
TraceEvent(warnAlwaysForLargePacket ? SevWarnAlways : SevWarn, "LargePacketSent")
TraceEvent(self->warnAlwaysForLargePacket ? SevWarnAlways : SevWarn, "LargePacketSent")
.suppressFor(1.0)
.detail("ToPeer", destination.getPrimaryAddress())
.detail("Length", (int)len)
@ -1860,7 +1950,7 @@ static void writePacket(PacketBuffer* pb,
.backtrace();
if (g_network->isSimulated())
warnAlwaysForLargePacket = false;
self->warnAlwaysForLargePacket = false;
}
#if VALGRIND
@ -1872,33 +1962,6 @@ static void writePacket(PacketBuffer* pb,
checkbuf = checkbuf->next;
}
#endif
}
static ReliablePacket* sendPacket(TransportData* self,
Reference<Peer> peer,
ISerializeSource const& what,
const Endpoint& destination,
bool reliable) {
++self->countPacketsGenerated;
// If there isn't an open connection, a public address, or the peer isn't compatible, we can't send
if (!peer || (peer->outgoingConnectionIdle && !destination.getPrimaryAddress().isPublic()) ||
(peer->incompatibleProtocolVersionNewer &&
destination.token != Endpoint::wellKnownToken(WLTOKEN_PING_PACKET))) {
TEST(true); // Can't send to private address without a compatible open connection
return nullptr;
}
bool firstUnsent = peer->unsent.empty();
PacketBuffer* pb = peer->unsent.getWriteBuffer();
ReliablePacket* rp = reliable ? new ReliablePacket : 0;
PacketWriter wr(pb,
rp,
AssumeVersion(g_network->protocolVersion())); // SOMEDAY: Can we downgrade to talk to older peers?
writePacket(pb, wr, destination, what, self->warnAlwaysForLargePacket);
peer->send(pb, rp, firstUnsent);
if (destination.token != Endpoint::wellKnownToken(WLTOKEN_PING_PACKET)) {