Better handling for large packets

On the sending side, a large packet is split into smaller pieces. On the
receiving side, use packet length to allocate buffer to avoid multiple memcpy
and allocations.
This commit is contained in:
Jingyu Zhou 2019-06-11 16:44:00 -07:00
parent cc69bc14ca
commit b151141965
1 changed files with 31 additions and 24 deletions

View File

@ -339,7 +339,7 @@ struct Peer : NonCopyable {
pkt.connectionId = transport->transportId;
PacketBuffer* pb_first = new PacketBuffer;
PacketWriter wr( pb_first, NULL, Unversioned() );
PacketWriter wr( pb_first, nullptr, Unversioned() );
pkt.serialize(wr);
unsent.prependWriteBuffer(pb_first, wr.finish());
}
@ -351,7 +351,7 @@ struct Peer : NonCopyable {
// If there are reliable packets, compact reliable packets into a new unsent range
if(!reliable.empty()) {
PacketBuffer* pb = unsent.getWriteBuffer();
pb = reliable.compact(pb, NULL);
pb = reliable.compact(pb, nullptr);
unsent.setWriteBuffer(pb);
}
}
@ -444,7 +444,7 @@ struct Peer : NonCopyable {
loop {
lastWriteTime = now();
int sent = conn->write( self->unsent.getUnsent() );
int sent = conn->write(self->unsent.getUnsent(), 200 * 1024); // avoid sending large packet
if (sent) {
self->transport->bytesSent += sent;
self->unsent.sent(sent);
@ -637,18 +637,14 @@ ACTOR static void deliver(TransportData* self, Endpoint destination, ArenaReader
g_network->setCurrentTask( TaskReadSocket );
}
static void scanPackets(TransportData* transport, uint8_t*& unprocessed_begin, uint8_t* e, Arena& arena,
static void scanPackets(TransportData* transport, uint8_t*& unprocessed_begin, const uint8_t* e, Arena& arena,
NetworkAddress const& peerAddress, ProtocolVersion peerProtocolVersion) {
// Find each complete packet in the given byte range and queue a ready task to deliver it.
// Remove the complete packets from the range by increasing unprocessed_begin.
// There won't be more than 64K of data plus one packet, so this shouldn't take a long time.
uint8_t* p = unprocessed_begin;
bool checksumEnabled = true;
if (peerAddress.isTLS()) {
checksumEnabled = false;
}
const bool checksumEnabled = !peerAddress.isTLS();
loop {
uint32_t packetLen, packetChecksum;
@ -734,6 +730,21 @@ static void scanPackets(TransportData* transport, uint8_t*& unprocessed_begin, u
}
}
// Given unprocessed buffer [begin, end), check if next packet size is known and return
// enough size for the next packet, whose format is: {size, optional_checksum, data}.
static int getNewBufferSize(uint8_t* begin, uint8_t* end, const NetworkAddress& peerAddress) {
const int len = end - begin;
if (len < sizeof(uint32_t)) {
return std::max(65536, len * 2);
}
const uint32_t packetLen = *(uint32_t*)begin;
if (packetLen > FLOW_KNOBS->PACKET_LIMIT) {
TraceEvent(SevError, "Net2_PacketLimitExceeded").detail("FromPeer", peerAddress.toString()).detail("Length", (int)packetLen);
throw platform_error();
}
return std::max<uint32_t>(4096, packetLen + sizeof(uint32_t) * (peerAddress.isTLS() ? 1 : 2));
}
ACTOR static Future<Void> connectionReader(
TransportData* transport,
Reference<IConnection> conn,
@ -741,12 +752,12 @@ ACTOR static Future<Void> connectionReader(
Promise<Peer*> onConnected)
{
// This actor exists whenever there is an open or opening connection, whether incoming or outgoing
// For incoming connections conn is set and peer is initially NULL; for outgoing connections it is the reverse
// For incoming connections conn is set and peer is initially nullptr; for outgoing connections it is the reverse
state Arena arena;
state uint8_t* unprocessed_begin = NULL;
state uint8_t* unprocessed_end = NULL;
state uint8_t* buffer_end = NULL;
state uint8_t* unprocessed_begin = nullptr;
state uint8_t* unprocessed_end = nullptr;
state uint8_t* buffer_end = nullptr;
state bool expectConnectPacket = true;
state bool compatible = false;
state bool incompatiblePeerCounted = false;
@ -764,9 +775,9 @@ ACTOR static Future<Void> connectionReader(
int readAllBytes = buffer_end - unprocessed_end;
if (readAllBytes < 4096) {
Arena newArena;
int unproc_len = unprocessed_end - unprocessed_begin;
int len = std::max( 65536, unproc_len*2 );
uint8_t* newBuffer = new (newArena) uint8_t[ len ];
const int unproc_len = unprocessed_end - unprocessed_begin;
const int len = getNewBufferSize(unprocessed_begin, unprocessed_end, peerAddress);
uint8_t* const newBuffer = new (newArena) uint8_t[ len ];
memcpy( newBuffer, unprocessed_begin, unproc_len );
arena = newArena;
unprocessed_begin = newBuffer;
@ -946,7 +957,7 @@ Peer* TransportData::getPeer( NetworkAddress const& address, bool openConnection
return peer->second;
}
if(!openConnection) {
return NULL;
return nullptr;
}
Peer* newPeer = new Peer(this, address);
peers[address] = newPeer;
@ -1100,13 +1111,9 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c
ASSERT(copy.size() > 0);
deliver(self, destination, ArenaReader(copy.arena(), copy, AssumeVersion(currentProtocolVersion)), false);
return (PacketID)NULL;
return (PacketID)nullptr;
} else {
bool checksumEnabled = true;
if (destination.getPrimaryAddress().isTLS()) {
checksumEnabled = false;
}
const bool checksumEnabled = !destination.getPrimaryAddress().isTLS();
++self->countPacketsGenerated;
Peer* peer = self->getPeer(destination.getPrimaryAddress(), openConnection);
@ -1114,7 +1121,7 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c
// If there isn't an open connection, a public address, or the peer isn't compatible, we can't send
if (!peer || (peer->outgoingConnectionIdle && !destination.getPrimaryAddress().isPublic()) || (peer->incompatibleProtocolVersionNewer && destination.token != WLTOKEN_PING_PACKET)) {
TEST(true); // Can't send to private address without a compatible open connection
return (PacketID)NULL;
return (PacketID)nullptr;
}
bool firstUnsent = peer->unsent.empty();