Performance improvements in socket use and buffering which results in TLS network connections using around 3.5x less CPU.

PacketBuffers are 16k minimum sizes, using FastAlloc.  All calling paths to PacketBuffer creation now pass a size hint if known.  P2PNetworkTest improved in many ways, errors go to TraceEvents instead of stdout, error counts are reported in stats, range min/max parameters are combined into "min:max" strings, new options to "stutter" connections by delaying randomly before socket read or write, message size to send is now configured separately for incoming vs outgoing sessions.
This commit is contained in:
Steve Atherton 2020-06-25 20:44:43 -07:00
parent 813dbfb297
commit 0da4c91ad5
11 changed files with 170 additions and 58 deletions

View File

@ -3849,7 +3849,8 @@ int main(int argc, char* argv[]) {
<< FastAllocator<1024>::pageCount << " " << FastAllocator<1024>::pageCount << " "
<< FastAllocator<2048>::pageCount << " " << FastAllocator<2048>::pageCount << " "
<< FastAllocator<4096>::pageCount << " " << FastAllocator<4096>::pageCount << " "
<< FastAllocator<8192>::pageCount << endl; << FastAllocator<8192>::pageCount << " "
<< FastAllocator<16384>::pageCount << endl;
vector< std::pair<std::string, const char*> > typeNames; vector< std::pair<std::string, const char*> > typeNames;
for( auto i = allocInstr.begin(); i != allocInstr.end(); ++i ) { for( auto i = allocInstr.begin(); i != allocInstr.end(); ++i ) {

View File

@ -59,7 +59,7 @@ public:
virtual void delref() { ReferenceCounted<AsyncFileBlobStoreWrite>::delref(); } virtual void delref() { ReferenceCounted<AsyncFileBlobStoreWrite>::delref(); }
struct Part : ReferenceCounted<Part> { struct Part : ReferenceCounted<Part> {
Part(int n) : number(n), writer(content.getWriteBuffer(), NULL, Unversioned()), length(0) { Part(int n, int minSize) : number(n), writer(content.getWriteBuffer(minSize), NULL, Unversioned()), length(0) {
etag = std::string(); etag = std::string();
::MD5_Init(&content_md5_buf); ::MD5_Init(&content_md5_buf);
} }
@ -231,7 +231,7 @@ private:
// Make a new part to write to // Make a new part to write to
if(startNew) if(startNew)
f->m_parts.push_back(Reference<Part>(new Part(f->m_parts.size() + 1))); f->m_parts.push_back(Reference<Part>(new Part(f->m_parts.size() + 1, f->m_bstore->knobs.multipart_min_part_size)));
return Void(); return Void();
} }
@ -247,7 +247,7 @@ public:
: m_bstore(bstore), m_bucket(bucket), m_object(object), m_cursor(0), m_concurrentUploads(bstore->knobs.concurrent_writes_per_file) { : m_bstore(bstore), m_bucket(bucket), m_object(object), m_cursor(0), m_concurrentUploads(bstore->knobs.concurrent_writes_per_file) {
// Add first part // Add first part
m_parts.push_back(Reference<Part>(new Part(1))); m_parts.push_back(Reference<Part>(new Part(1, m_bstore->knobs.multipart_min_part_size)));
} }
}; };

View File

@ -1050,7 +1050,7 @@ ACTOR Future<Void> writeEntireFileFromBuffer_impl(Reference<BlobStoreEndpoint> b
ACTOR Future<Void> writeEntireFile_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object, std::string content) { ACTOR Future<Void> writeEntireFile_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object, std::string content) {
state UnsentPacketQueue packets; state UnsentPacketQueue packets;
PacketWriter pw(packets.getWriteBuffer(), NULL, Unversioned()); PacketWriter pw(packets.getWriteBuffer(content.size()), NULL, Unversioned());
pw.serializeBytes(content); pw.serializeBytes(content);
if(content.size() > bstore->knobs.multipart_max_part_size) if(content.size() > bstore->knobs.multipart_max_part_size)
throw file_too_large(); throw file_too_large();
@ -1173,7 +1173,7 @@ ACTOR Future<Void> finishMultiPartUpload_impl(Reference<BlobStoreEndpoint> bstor
std::string resource = format("/%s/%s?uploadId=%s", bucket.c_str(), object.c_str(), uploadID.c_str()); std::string resource = format("/%s/%s?uploadId=%s", bucket.c_str(), object.c_str(), uploadID.c_str());
HTTP::Headers headers; HTTP::Headers headers;
PacketWriter pw(part_list.getWriteBuffer(), NULL, Unversioned()); PacketWriter pw(part_list.getWriteBuffer(manifest.size()), NULL, Unversioned());
pw.serializeBytes(manifest); pw.serializeBytes(manifest);
Reference<HTTP::Response> r = wait(bstore->doRequest("POST", resource, headers, &part_list, manifest.size(), {200})); Reference<HTTP::Response> r = wait(bstore->doRequest("POST", resource, headers, &part_list, manifest.size(), {200}));
// TODO: In the event that the client times out just before the request completes (so the client is unaware) then the next retry // TODO: In the event that the client times out just before the request completes (so the client is unaware) then the next retry

View File

@ -414,20 +414,39 @@ ACTOR Future<Void> connectionWriter( Reference<Peer> self, Reference<IConnection
//wait( delay(500e-6, TaskPriority::WriteSocket) ); //wait( delay(500e-6, TaskPriority::WriteSocket) );
//wait( yield(TaskPriority::WriteSocket) ); //wait( yield(TaskPriority::WriteSocket) );
state int bytesBeforeDelay = FLOW_KNOBS->MAX_PACKET_SEND_BYTES;
// Send until there is nothing left to send // Send until there is nothing left to send
loop { loop {
lastWriteTime = now(); lastWriteTime = now();
int sent = conn->write(self->unsent.getUnsent(), /* limit= */ FLOW_KNOBS->MAX_PACKET_SEND_BYTES); int sent = conn->write(self->unsent.getUnsent(), bytesBeforeDelay);
if (sent) { if (sent) {
self->transport->bytesSent += sent; self->transport->bytesSent += sent;
bytesBeforeDelay -= sent;
self->unsent.sent(sent); self->unsent.sent(sent);
} }
if (self->unsent.empty()) break;
if (self->unsent.empty()) {
break;
}
TEST(true); // We didn't write everything, so apparently the write buffer is full. Wait for it to be nonfull. TEST(true); // We didn't write everything, so apparently the write buffer is full. Wait for it to be nonfull.
wait( conn->onWritable() ); wait( conn->onWritable() );
wait( yield(TaskPriority::WriteSocket) );
// After the first write on a connection, onWritable() must be called before every write, because
// otherwise for some reason with TLS connections calling write() too much can result in a socket
// closure rather than returning 0 or some kind of WouldBlock error. This, combined with the fact
// that TLS connections will only read from the first PacketBuffer in the chain, leads to this
// loop being executed far more often than for non-TLS connections. To mitigate this, and since
// MAX_PACKET_SEND_BYTES is intended to be the most bytes from the PacketBuffer chain that can be
// sent without interruption, we will only yield one we have sent MAX_PACKET_SEND_BYTES since the
// last yield.
if (bytesBeforeDelay <= 0) {
wait(delay(0, TaskPriority::WriteSocket));
bytesBeforeDelay = FLOW_KNOBS->MAX_PACKET_SEND_BYTES;
}
} }
// Wait until there is something to send // Wait until there is something to send

View File

@ -2015,7 +2015,8 @@ int main(int argc, char* argv[]) {
<< FastAllocator<1024>::pageCount << " " << FastAllocator<1024>::pageCount << " "
<< FastAllocator<2048>::pageCount << " " << FastAllocator<2048>::pageCount << " "
<< FastAllocator<4096>::pageCount << " " << FastAllocator<4096>::pageCount << " "
<< FastAllocator<8192>::pageCount << std::endl; << FastAllocator<8192>::pageCount << " "
<< FastAllocator<16384>::pageCount << std::endl;
vector< std::pair<std::string, const char*> > typeNames; vector< std::pair<std::string, const char*> > typeNames;
for( auto i = allocInstr.begin(); i != allocInstr.end(); ++i ) { for( auto i = allocInstr.begin(); i != allocInstr.end(); ++i ) {

View File

@ -240,30 +240,69 @@ ACTOR Future<Void> networkTestClient( std:: string testServers ) {
struct RandomIntRange { struct RandomIntRange {
int min; int min;
int max; int max;
RandomIntRange(int low = 0, int high = 0) : min(low), max(high) {
}
// Accepts strings of the form "min:max" or "N"
// where N will be used for both min and max
RandomIntRange(std::string str) {
StringRef high = str;
StringRef low = high.eat(":");
if(high.size() == 0) {
high = low;
}
min = low.size() == 0 ? 0 : atol(low.toString().c_str());
max = high.size() == 0 ? 0 : atol(high.toString().c_str());
if(min > max) {
std::swap(min, max);
}
}
int get() const { int get() const {
return nondeterministicRandom()->randomInt(min, max + 1); return (max == 0) ? 0 : nondeterministicRandom()->randomInt(min, max + 1);
}
std::string toString() const {
return format("%d:%d", min, max);
} }
}; };
struct P2PNetworkTest { struct P2PNetworkTest {
// Addresses to listen on
std::vector<Reference<IListener>> listeners; std::vector<Reference<IListener>> listeners;
// Addresses to randomly connect to
std::vector<NetworkAddress> remotes; std::vector<NetworkAddress> remotes;
// Number of outgoing connections to maintain
int connectionsOut; int connectionsOut;
RandomIntRange msgBytes; // Message size range to send on outgoing established connections
RandomIntRange sendMsgBytes;
// Message size range to send on incoming established connections
RandomIntRange recvMsgBytes;
// Delay after message send and receive are complete before closing connection
RandomIntRange idleMilliseconds; RandomIntRange idleMilliseconds;
// Random delay before socket reads
RandomIntRange waitReadMilliseconds;
// Random delay before socket writes
RandomIntRange waitWriteMilliseconds;
double startTime; double startTime;
int64_t bytesSent; int64_t bytesSent;
int64_t bytesReceived; int64_t bytesReceived;
int sessionsIn; int sessionsIn;
int sessionsOut; int sessionsOut;
int connectErrors;
int acceptErrors;
int sessionErrors;
Standalone<StringRef> msgBuffer; Standalone<StringRef> msgBuffer;
int sendRecvSize;
std::string statsString() { std::string statsString() {
double elapsed = now() - startTime; double elapsed = now() - startTime;
std::string s = format("%.2f MB/s bytes in %.2f MB/s bytes out %.2f/s completed sessions in %.2f/s completed sessions out", std::string s = format("%.2f MB/s bytes in %.2f MB/s bytes out %.2f/s completed sessions in %.2f/s completed sessions out ",
bytesReceived / elapsed / 1e6, bytesSent / elapsed / 1e6, sessionsIn / elapsed, sessionsOut / elapsed); bytesReceived / elapsed / 1e6, bytesSent / elapsed / 1e6, sessionsIn / elapsed, sessionsOut / elapsed);
s += format("Total Errors %d connect=%d accept=%d session=%d",
connectErrors + acceptErrors + sessionErrors, connectErrors, acceptErrors, sessionErrors);
bytesSent = 0; bytesSent = 0;
bytesReceived = 0; bytesReceived = 0;
sessionsIn = 0; sessionsIn = 0;
@ -274,13 +313,16 @@ struct P2PNetworkTest {
P2PNetworkTest() {} P2PNetworkTest() {}
P2PNetworkTest(std::string listenerAddresses, std::string remoteAddresses, int connectionsOut, RandomIntRange msgBytes, RandomIntRange idleMilliseconds, int sendRecvSize) P2PNetworkTest(std::string listenerAddresses, std::string remoteAddresses, int connectionsOut, RandomIntRange sendMsgBytes, RandomIntRange recvMsgBytes, RandomIntRange idleMilliseconds, RandomIntRange waitReadMilliseconds, RandomIntRange waitWriteMilliseconds)
: connectionsOut(connectionsOut), msgBytes(msgBytes), idleMilliseconds(idleMilliseconds), sendRecvSize(sendRecvSize) { : connectionsOut(connectionsOut), sendMsgBytes(sendMsgBytes), recvMsgBytes(recvMsgBytes), idleMilliseconds(idleMilliseconds), waitReadMilliseconds(waitReadMilliseconds), waitWriteMilliseconds(waitWriteMilliseconds) {
bytesSent = 0; bytesSent = 0;
bytesReceived = 0; bytesReceived = 0;
sessionsIn = 0; sessionsIn = 0;
sessionsOut = 0; sessionsOut = 0;
msgBuffer = nondeterministicRandom()->randomAlphaNumeric(msgBytes.max); connectErrors = 0;
acceptErrors = 0;
sessionErrors = 0;
msgBuffer = makeString(std::max(sendMsgBytes.max, recvMsgBytes.max));
if(!remoteAddresses.empty()) { if(!remoteAddresses.empty()) {
remotes = NetworkAddress::parseList(remoteAddresses); remotes = NetworkAddress::parseList(remoteAddresses);
@ -298,7 +340,7 @@ struct P2PNetworkTest {
} }
ACTOR static Future<Void> readMsg(P2PNetworkTest *self, Reference<IConnection> conn) { ACTOR static Future<Void> readMsg(P2PNetworkTest *self, Reference<IConnection> conn) {
state Standalone<StringRef> buffer = makeString(self->sendRecvSize); state Standalone<StringRef> buffer = makeString(FLOW_KNOBS->MAX_PACKET_SEND_BYTES);
state int bytesToRead = sizeof(int); state int bytesToRead = sizeof(int);
state int writeOffset = 0; state int writeOffset = 0;
state bool gotHeader = false; state bool gotHeader = false;
@ -306,6 +348,11 @@ struct P2PNetworkTest {
// Fill buffer sequentially until the initial bytesToRead is read (or more), then read // Fill buffer sequentially until the initial bytesToRead is read (or more), then read
// intended message size and add it to bytesToRead, continue if needed until bytesToRead is 0. // intended message size and add it to bytesToRead, continue if needed until bytesToRead is 0.
loop { loop {
int stutter = self->waitReadMilliseconds.get();
if(stutter > 0) {
wait(delay(stutter / 1e3));
}
int len = conn->read((uint8_t *)buffer.begin() + writeOffset, (uint8_t *)buffer.end()); int len = conn->read((uint8_t *)buffer.begin() + writeOffset, (uint8_t *)buffer.end());
bytesToRead -= len; bytesToRead -= len;
self->bytesReceived += len; self->bytesReceived += len;
@ -313,8 +360,8 @@ struct P2PNetworkTest {
// If no size header yet but there are enough bytes, read the size // If no size header yet but there are enough bytes, read the size
if(!gotHeader && bytesToRead <= 0) { if(!gotHeader && bytesToRead <= 0) {
gotHeader = true; gotHeader = true;
bytesToRead += *(int *)buffer.begin(); bytesToRead += *(int *)buffer.begin();
} }
if(gotHeader) { if(gotHeader) {
@ -334,46 +381,65 @@ struct P2PNetworkTest {
return Void(); return Void();
} }
ACTOR static Future<Void> writeMsg(P2PNetworkTest *self, Reference<IConnection> conn) { ACTOR static Future<Void> writeMsg(P2PNetworkTest *self, Reference<IConnection> conn, bool incoming) {
state UnsentPacketQueue packets; state UnsentPacketQueue packets;
state int msgSize = self->msgBytes.get(); state int msgSize = (incoming ? self->recvMsgBytes : self->sendMsgBytes).get();
PacketWriter writer(packets.getWriteBuffer(), nullptr, Unversioned()); PacketWriter writer(packets.getWriteBuffer(msgSize), nullptr, Unversioned());
writer.serializeBinaryItem(msgSize); writer.serializeBinaryItem(msgSize);
writer.serializeBytes(self->msgBuffer.substr(0, msgSize)); writer.serializeBytes(self->msgBuffer.substr(0, msgSize));
loop { state int bytesBeforeDelay = FLOW_KNOBS->MAX_PACKET_SEND_BYTES;
wait(conn->onWritable());
wait( delay( 0, TaskPriority::WriteSocket ) );
int len = conn->write(packets.getUnsent(), self->sendRecvSize);
self->bytesSent += len;
packets.sent(len);
if(packets.empty()) loop {
int stutter = self->waitWriteMilliseconds.get();
if(stutter > 0) {
wait(delay(stutter / 1e3));
}
int sent = conn->write(packets.getUnsent(), bytesBeforeDelay);
if(sent != 0) {
self->bytesSent += sent;
bytesBeforeDelay -= sent;
packets.sent(sent);
}
if(packets.empty()) {
break; break;
}
wait(conn->onWritable());
if(bytesBeforeDelay <= 0) {
wait(delay(0, TaskPriority::WriteSocket));
bytesBeforeDelay = FLOW_KNOBS->MAX_PACKET_SEND_BYTES;
}
} }
return Void(); return Void();
} }
ACTOR static Future<Void> doSession(P2PNetworkTest *self, Reference<IConnection> conn, bool accept) { ACTOR static Future<Void> doSession(P2PNetworkTest *self, Reference<IConnection> conn, bool incoming) {
try { try {
if(accept) { if(incoming) {
wait(conn->acceptHandshake()); wait(conn->acceptHandshake());
} else { } else {
wait(conn->connectHandshake()); wait(conn->connectHandshake());
} }
wait(readMsg(self, conn) && writeMsg(self, conn)); wait(readMsg(self, conn) && writeMsg(self, conn, incoming));
wait(delay(self->idleMilliseconds.get() / 1e3)); wait(delay(self->idleMilliseconds.get() / 1e3));
conn->close(); conn->close();
if(accept) { if(incoming) {
++self->sessionsIn; ++self->sessionsIn;
} else { } else {
++self->sessionsOut; ++self->sessionsOut;
} }
} catch(Error &e) { } catch(Error &e) {
printf("doSession: error %s on remote %s\n", e.what(), conn->getPeerAddress().toString().c_str()); ++self->sessionErrors;
TraceEvent(SevError, incoming ? "P2PIncomingSessionError" : "P2POutgoingSessionError")
.detail("Remote", conn->getPeerAddress())
.error(e);
} }
return Void(); return Void();
@ -389,7 +455,11 @@ struct P2PNetworkTest {
//printf("Connected to %s\n", remote.toString().c_str()); //printf("Connected to %s\n", remote.toString().c_str());
wait(doSession(self, conn, false)); wait(doSession(self, conn, false));
} catch(Error &e) { } catch(Error &e) {
printf("outgoing: error %s on remote %s\n", e.what(), remote.toString().c_str()); ++self->connectErrors;
TraceEvent(SevError, "P2POutgoingError")
.detail("Remote", remote)
.error(e);
wait(delay(1));
} }
} }
} }
@ -405,7 +475,11 @@ struct P2PNetworkTest {
//printf("Connected from %s\n", conn->getPeerAddress().toString().c_str()); //printf("Connected from %s\n", conn->getPeerAddress().toString().c_str());
sessions.add(doSession(self, conn, true)); sessions.add(doSession(self, conn, true));
} catch(Error &e) { } catch(Error &e) {
printf("incoming: error %s on listener %s\n", e.what(), listener->getListenAddress().toString().c_str()); ++self->acceptErrors;
TraceEvent(SevError, "P2PIncomingError")
.detail("Listener", listener->getListenAddress())
.error(e);
wait(delay(1));
} }
} }
} }
@ -416,9 +490,12 @@ struct P2PNetworkTest {
self->startTime = now(); self->startTime = now();
printf("%d listeners, %d remotes, %d outgoing connections\n", self->listeners.size(), self->remotes.size(), self->connectionsOut); printf("%d listeners, %d remotes, %d outgoing connections\n", self->listeners.size(), self->remotes.size(), self->connectionsOut);
printf("Message size %d to %d bytes\n", self->msgBytes.min, self->msgBytes.max); printf("Send message size: %s\n", self->sendMsgBytes.toString().c_str());
printf("Post exchange delay %f to %f seconds\n", self->idleMilliseconds.min / 1e3, self->idleMilliseconds.max / 1e3); printf("Receive message size: %s\n", self->recvMsgBytes.toString().c_str());
printf("Send/Recv size %d bytes\n", self->sendRecvSize); printf("Delay before socket read: %s\n", self->waitReadMilliseconds.toString().c_str());
printf("Delay before socket write: %s\n", self->waitWriteMilliseconds.toString().c_str());
printf("Delay before session close: %s\n", self->idleMilliseconds.toString().c_str());
printf("Send/Recv size %d bytes\n", FLOW_KNOBS->MAX_PACKET_SEND_BYTES);
for(auto n : self->remotes) { for(auto n : self->remotes) {
printf("Remote: %s\n", n.toString().c_str()); printf("Remote: %s\n", n.toString().c_str());
@ -436,7 +513,7 @@ struct P2PNetworkTest {
} }
loop { loop {
wait(delay(1.0)); wait(delay(1.0, TaskPriority::Max));
printf("%s\n", self->statsString().c_str()); printf("%s\n", self->statsString().c_str());
} }
} }
@ -460,12 +537,14 @@ std::string getEnvStr(const char *name, std::string defaultValue = "") {
// TODO: Remove this hacky thing and make a "networkp2ptest" role in fdbserver // TODO: Remove this hacky thing and make a "networkp2ptest" role in fdbserver
TEST_CASE("!p2ptest") { TEST_CASE("!p2ptest") {
state P2PNetworkTest p2p( state P2PNetworkTest p2p(
getEnvStr("listenerAddresses", "127.0.0.1:5000,127.0.0.1:5001:tls"), getEnvStr("listenerAddresses", "127.0.0.1:5001:tls"),
getEnvStr("remoteAddresses", "127.0.0.1:5000,127.0.0.1:5001:tls"), getEnvStr("remoteAddresses", "127.0.0.1:5001:tls"),
getEnvInt("connectionsOut", 2), getEnvInt("connectionsOut", 1),
{getEnvInt("minMsgBytes", 0), getEnvInt("maxMsgBytes", 1000000)}, getEnvStr("sendMsgBytes", "1000000000:1000000000"),
{getEnvInt("minIdleMilliseconds", 500), getEnvInt("maxIdleMilliseconds", 1000)}, getEnvStr("recvMsgBytes", "0:0"),
getEnvInt("sendRecvSize", 32000) getEnvStr("idleMilliseconds", "0:0"),
getEnvStr("waitReadMilliseconds", "0:0"),
getEnvStr("waitWriteMilliseconds", "0:0")
); );
wait(p2p.run()); wait(p2p.run());

View File

@ -527,6 +527,7 @@ void releaseAllThreadMagazines() {
FastAllocator<2048>::releaseThreadMagazines(); FastAllocator<2048>::releaseThreadMagazines();
FastAllocator<4096>::releaseThreadMagazines(); FastAllocator<4096>::releaseThreadMagazines();
FastAllocator<8192>::releaseThreadMagazines(); FastAllocator<8192>::releaseThreadMagazines();
FastAllocator<16384>::releaseThreadMagazines();
} }
int64_t getTotalUnusedAllocatedMemory() { int64_t getTotalUnusedAllocatedMemory() {
@ -543,6 +544,7 @@ int64_t getTotalUnusedAllocatedMemory() {
unusedMemory += FastAllocator<2048>::getApproximateMemoryUnused(); unusedMemory += FastAllocator<2048>::getApproximateMemoryUnused();
unusedMemory += FastAllocator<4096>::getApproximateMemoryUnused(); unusedMemory += FastAllocator<4096>::getApproximateMemoryUnused();
unusedMemory += FastAllocator<8192>::getApproximateMemoryUnused(); unusedMemory += FastAllocator<8192>::getApproximateMemoryUnused();
unusedMemory += FastAllocator<16384>::getApproximateMemoryUnused();
return unusedMemory; return unusedMemory;
} }
@ -558,3 +560,4 @@ template class FastAllocator<1024>;
template class FastAllocator<2048>; template class FastAllocator<2048>;
template class FastAllocator<4096>; template class FastAllocator<4096>;
template class FastAllocator<8192>; template class FastAllocator<8192>;
template class FastAllocator<16384>;

View File

@ -216,6 +216,7 @@ public:
if (size <= 2048) return FastAllocator<2048>::allocate(); if (size <= 2048) return FastAllocator<2048>::allocate();
if (size <= 4096) return FastAllocator<4096>::allocate(); if (size <= 4096) return FastAllocator<4096>::allocate();
if (size <= 8192) return FastAllocator<8192>::allocate(); if (size <= 8192) return FastAllocator<8192>::allocate();
if (size <= 16384) return FastAllocator<16384>::allocate();
return new uint8_t[size]; return new uint8_t[size];
} }
@ -231,6 +232,7 @@ inline void freeFast(int size, void* ptr) {
if (size <= 2048) return FastAllocator<2048>::release(ptr); if (size <= 2048) return FastAllocator<2048>::release(ptr);
if (size <= 4096) return FastAllocator<4096>::release(ptr); if (size <= 4096) return FastAllocator<4096>::release(ptr);
if (size <= 8192) return FastAllocator<8192>::release(ptr); if (size <= 8192) return FastAllocator<8192>::release(ptr);
if (size <= 16384) return FastAllocator<16384>::release(ptr);
delete[](uint8_t*)ptr; delete[](uint8_t*)ptr;
} }

View File

@ -49,7 +49,7 @@ void PacketWriter::serializeBytesAcrossBoundary(const void* data, int bytes) {
if (!bytes) break; if (!bytes) break;
data = (uint8_t*)data + b; data = (uint8_t*)data + b;
nextBuffer(); nextBuffer(bytes);
} }
} }
@ -69,6 +69,8 @@ void PacketWriter::nextBuffer(size_t size) {
} }
} }
// Adds exactly bytes of unwritten length to the buffer, possibly across packet buffer boundaries,
// and initializes buf to point to the packet buffer(s) that contain the unwritten space
void PacketWriter::writeAhead( int bytes, struct SplitBuffer* buf ) { void PacketWriter::writeAhead( int bytes, struct SplitBuffer* buf ) {
if (bytes <= buffer->bytes_unwritten()) { if (bytes <= buffer->bytes_unwritten()) {
buf->begin = buffer->data() + buffer->bytes_written; buf->begin = buffer->data() + buffer->bytes_written;
@ -79,9 +81,10 @@ void PacketWriter::writeAhead( int bytes, struct SplitBuffer* buf ) {
buf->begin = buffer->data() + buffer->bytes_written; buf->begin = buffer->data() + buffer->bytes_written;
buf->first_length = buffer->bytes_unwritten(); buf->first_length = buffer->bytes_unwritten();
buffer->bytes_written = buffer->size(); buffer->bytes_written = buffer->size();
nextBuffer(); size_t remaining = bytes - buf->first_length;
nextBuffer(remaining);
buf->next = buffer->data(); buf->next = buffer->data();
buffer->bytes_written = bytes - buf->first_length; buffer->bytes_written = remaining;
} }
} }
@ -172,7 +175,7 @@ PacketBuffer* ReliablePacketList::compact(PacketBuffer* into, PacketBuffer* end)
if (c->buffer == end /*&& c->begin>=c->buffer->bytes_written*/) // quit when we hit the unsent range if (c->buffer == end /*&& c->begin>=c->buffer->bytes_written*/) // quit when we hit the unsent range
return into; return into;
if (into->bytes_written == into->size()) { if (into->bytes_written == into->size()) {
into->next = PacketBuffer::create(); into->next = PacketBuffer::create(into->size());
into = into->nextPacketBuffer(); into = into->nextPacketBuffer();
} }

View File

@ -44,10 +44,10 @@ public:
~UnsentPacketQueue() { discardAll(); } ~UnsentPacketQueue() { discardAll(); }
// Get a PacketBuffer to write new packets into // Get a PacketBuffer to write new packets into
PacketBuffer* getWriteBuffer() { PacketBuffer* getWriteBuffer(size_t sizeHint = 0) {
if (!unsent_last) { if (!unsent_last) {
ASSERT(!unsent_first); ASSERT(!unsent_first);
unsent_first = unsent_last = PacketBuffer::create(); unsent_first = unsent_last = PacketBuffer::create(sizeHint);
}; };
return unsent_last; return unsent_last;
} }

View File

@ -669,12 +669,16 @@ struct SendBuffer {
uint8_t const* data; uint8_t const* data;
SendBuffer* next; SendBuffer* next;
int bytes_written, bytes_sent; int bytes_written, bytes_sent;
int bytes_unsent() const {
return bytes_written - bytes_sent;
}
}; };
struct PacketBuffer : SendBuffer { struct PacketBuffer : SendBuffer {
private: private:
int reference_count; int reference_count;
uint32_t size_; uint32_t size_;
static constexpr size_t PACKET_BUFFER_MIN_SIZE = 16384;
static constexpr size_t PACKET_BUFFER_OVERHEAD = 32; static constexpr size_t PACKET_BUFFER_OVERHEAD = 32;
public: public:
@ -691,9 +695,9 @@ private:
public: public:
static PacketBuffer* create(size_t size = 0) { static PacketBuffer* create(size_t size = 0) {
size = std::max(size, 4096 - PACKET_BUFFER_OVERHEAD); size = std::max(size, PACKET_BUFFER_MIN_SIZE - PACKET_BUFFER_OVERHEAD);
if (size == 4096 - PACKET_BUFFER_OVERHEAD) { if (size == PACKET_BUFFER_MIN_SIZE - PACKET_BUFFER_OVERHEAD) {
return new (FastAllocator<4096>::allocate()) PacketBuffer{ size }; return new (FastAllocator<PACKET_BUFFER_MIN_SIZE>::allocate()) PacketBuffer{ size };
} }
uint8_t* mem = new uint8_t[size + PACKET_BUFFER_OVERHEAD]; uint8_t* mem = new uint8_t[size + PACKET_BUFFER_OVERHEAD];
return new (mem) PacketBuffer{ size }; return new (mem) PacketBuffer{ size };
@ -702,8 +706,8 @@ public:
void addref() { ++reference_count; } void addref() { ++reference_count; }
void delref() { void delref() {
if (!--reference_count) { if (!--reference_count) {
if (size_ == 4096 - PACKET_BUFFER_OVERHEAD) { if (size_ == PACKET_BUFFER_MIN_SIZE - PACKET_BUFFER_OVERHEAD) {
FastAllocator<4096>::release(this); FastAllocator<PACKET_BUFFER_MIN_SIZE>::release(this);
} else { } else {
delete[] this; delete[] this;
} }