diff --git a/fdbbackup/backup.actor.cpp b/fdbbackup/backup.actor.cpp index 0076ca9e05..a77c39d623 100644 --- a/fdbbackup/backup.actor.cpp +++ b/fdbbackup/backup.actor.cpp @@ -3849,7 +3849,8 @@ int main(int argc, char* argv[]) { << FastAllocator<1024>::pageCount << " " << FastAllocator<2048>::pageCount << " " << FastAllocator<4096>::pageCount << " " - << FastAllocator<8192>::pageCount << endl; + << FastAllocator<8192>::pageCount << " " + << FastAllocator<16384>::pageCount << endl; vector< std::pair > typeNames; for( auto i = allocInstr.begin(); i != allocInstr.end(); ++i ) { diff --git a/fdbclient/AsyncFileBlobStore.actor.h b/fdbclient/AsyncFileBlobStore.actor.h index 62d91dc2bf..05690a5d4b 100644 --- a/fdbclient/AsyncFileBlobStore.actor.h +++ b/fdbclient/AsyncFileBlobStore.actor.h @@ -59,7 +59,7 @@ public: virtual void delref() { ReferenceCounted::delref(); } struct Part : ReferenceCounted { - Part(int n) : number(n), writer(content.getWriteBuffer(), NULL, Unversioned()), length(0) { + Part(int n, int minSize) : number(n), writer(content.getWriteBuffer(minSize), NULL, Unversioned()), length(0) { etag = std::string(); ::MD5_Init(&content_md5_buf); } @@ -231,7 +231,7 @@ private: // Make a new part to write to if(startNew) - f->m_parts.push_back(Reference(new Part(f->m_parts.size() + 1))); + f->m_parts.push_back(Reference(new Part(f->m_parts.size() + 1, f->m_bstore->knobs.multipart_min_part_size))); return Void(); } @@ -247,7 +247,7 @@ public: : m_bstore(bstore), m_bucket(bucket), m_object(object), m_cursor(0), m_concurrentUploads(bstore->knobs.concurrent_writes_per_file) { // Add first part - m_parts.push_back(Reference(new Part(1))); + m_parts.push_back(Reference(new Part(1, m_bstore->knobs.multipart_min_part_size))); } }; diff --git a/fdbclient/BlobStore.actor.cpp b/fdbclient/BlobStore.actor.cpp index ec27791e15..9648556d22 100644 --- a/fdbclient/BlobStore.actor.cpp +++ b/fdbclient/BlobStore.actor.cpp @@ -1050,7 +1050,7 @@ ACTOR Future writeEntireFileFromBuffer_impl(Reference b ACTOR Future writeEntireFile_impl(Reference bstore, std::string bucket, std::string object, std::string content) { state UnsentPacketQueue packets; - PacketWriter pw(packets.getWriteBuffer(), NULL, Unversioned()); + PacketWriter pw(packets.getWriteBuffer(content.size()), NULL, Unversioned()); pw.serializeBytes(content); if(content.size() > bstore->knobs.multipart_max_part_size) throw file_too_large(); @@ -1173,7 +1173,7 @@ ACTOR Future finishMultiPartUpload_impl(Reference bstor std::string resource = format("/%s/%s?uploadId=%s", bucket.c_str(), object.c_str(), uploadID.c_str()); HTTP::Headers headers; - PacketWriter pw(part_list.getWriteBuffer(), NULL, Unversioned()); + PacketWriter pw(part_list.getWriteBuffer(manifest.size()), NULL, Unversioned()); pw.serializeBytes(manifest); Reference r = wait(bstore->doRequest("POST", resource, headers, &part_list, manifest.size(), {200})); // TODO: In the event that the client times out just before the request completes (so the client is unaware) then the next retry diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 7c9e3ed912..e3c8f6b4a1 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -414,20 +414,39 @@ ACTOR Future connectionWriter( Reference self, ReferenceMAX_PACKET_SEND_BYTES; + // Send until there is nothing left to send loop { lastWriteTime = now(); - int sent = conn->write(self->unsent.getUnsent(), /* limit= */ FLOW_KNOBS->MAX_PACKET_SEND_BYTES); + int sent = conn->write(self->unsent.getUnsent(), bytesBeforeDelay); + if (sent) { self->transport->bytesSent += sent; + bytesBeforeDelay -= sent; self->unsent.sent(sent); } - if (self->unsent.empty()) break; + + if (self->unsent.empty()) { + break; + } TEST(true); // We didn't write everything, so apparently the write buffer is full. Wait for it to be nonfull. wait( conn->onWritable() ); - wait( yield(TaskPriority::WriteSocket) ); + + // After the first write on a connection, onWritable() must be called before every write, because + // otherwise for some reason with TLS connections calling write() too much can result in a socket + // closure rather than returning 0 or some kind of WouldBlock error. This, combined with the fact + // that TLS connections will only read from the first PacketBuffer in the chain, leads to this + // loop being executed far more often than for non-TLS connections. To mitigate this, and since + // MAX_PACKET_SEND_BYTES is intended to be the most bytes from the PacketBuffer chain that can be + // sent without interruption, we will only yield one we have sent MAX_PACKET_SEND_BYTES since the + // last yield. + if (bytesBeforeDelay <= 0) { + wait(delay(0, TaskPriority::WriteSocket)); + bytesBeforeDelay = FLOW_KNOBS->MAX_PACKET_SEND_BYTES; + } } // Wait until there is something to send diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index 576e811c50..5ff584de96 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -2015,7 +2015,8 @@ int main(int argc, char* argv[]) { << FastAllocator<1024>::pageCount << " " << FastAllocator<2048>::pageCount << " " << FastAllocator<4096>::pageCount << " " - << FastAllocator<8192>::pageCount << std::endl; + << FastAllocator<8192>::pageCount << " " + << FastAllocator<16384>::pageCount << std::endl; vector< std::pair > typeNames; for( auto i = allocInstr.begin(); i != allocInstr.end(); ++i ) { diff --git a/fdbserver/networktest.actor.cpp b/fdbserver/networktest.actor.cpp index b86606438f..1451685a2b 100644 --- a/fdbserver/networktest.actor.cpp +++ b/fdbserver/networktest.actor.cpp @@ -240,30 +240,69 @@ ACTOR Future networkTestClient( std:: string testServers ) { struct RandomIntRange { int min; int max; + + RandomIntRange(int low = 0, int high = 0) : min(low), max(high) { + } + + // Accepts strings of the form "min:max" or "N" + // where N will be used for both min and max + RandomIntRange(std::string str) { + StringRef high = str; + StringRef low = high.eat(":"); + if(high.size() == 0) { + high = low; + } + min = low.size() == 0 ? 0 : atol(low.toString().c_str()); + max = high.size() == 0 ? 0 : atol(high.toString().c_str()); + if(min > max) { + std::swap(min, max); + } + } + int get() const { - return nondeterministicRandom()->randomInt(min, max + 1); + return (max == 0) ? 0 : nondeterministicRandom()->randomInt(min, max + 1); + } + + std::string toString() const { + return format("%d:%d", min, max); } }; struct P2PNetworkTest { + // Addresses to listen on std::vector> listeners; + // Addresses to randomly connect to std::vector remotes; + // Number of outgoing connections to maintain int connectionsOut; - RandomIntRange msgBytes; + // Message size range to send on outgoing established connections + RandomIntRange sendMsgBytes; + // Message size range to send on incoming established connections + RandomIntRange recvMsgBytes; + // Delay after message send and receive are complete before closing connection RandomIntRange idleMilliseconds; + // Random delay before socket reads + RandomIntRange waitReadMilliseconds; + // Random delay before socket writes + RandomIntRange waitWriteMilliseconds; double startTime; int64_t bytesSent; int64_t bytesReceived; int sessionsIn; int sessionsOut; + int connectErrors; + int acceptErrors; + int sessionErrors; + Standalone msgBuffer; - int sendRecvSize; std::string statsString() { double elapsed = now() - startTime; - std::string s = format("%.2f MB/s bytes in %.2f MB/s bytes out %.2f/s completed sessions in %.2f/s completed sessions out", + std::string s = format("%.2f MB/s bytes in %.2f MB/s bytes out %.2f/s completed sessions in %.2f/s completed sessions out ", bytesReceived / elapsed / 1e6, bytesSent / elapsed / 1e6, sessionsIn / elapsed, sessionsOut / elapsed); + s += format("Total Errors %d connect=%d accept=%d session=%d", + connectErrors + acceptErrors + sessionErrors, connectErrors, acceptErrors, sessionErrors); bytesSent = 0; bytesReceived = 0; sessionsIn = 0; @@ -274,13 +313,16 @@ struct P2PNetworkTest { P2PNetworkTest() {} - P2PNetworkTest(std::string listenerAddresses, std::string remoteAddresses, int connectionsOut, RandomIntRange msgBytes, RandomIntRange idleMilliseconds, int sendRecvSize) - : connectionsOut(connectionsOut), msgBytes(msgBytes), idleMilliseconds(idleMilliseconds), sendRecvSize(sendRecvSize) { + P2PNetworkTest(std::string listenerAddresses, std::string remoteAddresses, int connectionsOut, RandomIntRange sendMsgBytes, RandomIntRange recvMsgBytes, RandomIntRange idleMilliseconds, RandomIntRange waitReadMilliseconds, RandomIntRange waitWriteMilliseconds) + : connectionsOut(connectionsOut), sendMsgBytes(sendMsgBytes), recvMsgBytes(recvMsgBytes), idleMilliseconds(idleMilliseconds), waitReadMilliseconds(waitReadMilliseconds), waitWriteMilliseconds(waitWriteMilliseconds) { bytesSent = 0; bytesReceived = 0; sessionsIn = 0; sessionsOut = 0; - msgBuffer = nondeterministicRandom()->randomAlphaNumeric(msgBytes.max); + connectErrors = 0; + acceptErrors = 0; + sessionErrors = 0; + msgBuffer = makeString(std::max(sendMsgBytes.max, recvMsgBytes.max)); if(!remoteAddresses.empty()) { remotes = NetworkAddress::parseList(remoteAddresses); @@ -298,7 +340,7 @@ struct P2PNetworkTest { } ACTOR static Future readMsg(P2PNetworkTest *self, Reference conn) { - state Standalone buffer = makeString(self->sendRecvSize); + state Standalone buffer = makeString(FLOW_KNOBS->MAX_PACKET_SEND_BYTES); state int bytesToRead = sizeof(int); state int writeOffset = 0; state bool gotHeader = false; @@ -306,6 +348,11 @@ struct P2PNetworkTest { // Fill buffer sequentially until the initial bytesToRead is read (or more), then read // intended message size and add it to bytesToRead, continue if needed until bytesToRead is 0. loop { + int stutter = self->waitReadMilliseconds.get(); + if(stutter > 0) { + wait(delay(stutter / 1e3)); + } + int len = conn->read((uint8_t *)buffer.begin() + writeOffset, (uint8_t *)buffer.end()); bytesToRead -= len; self->bytesReceived += len; @@ -313,8 +360,8 @@ struct P2PNetworkTest { // If no size header yet but there are enough bytes, read the size if(!gotHeader && bytesToRead <= 0) { - gotHeader = true; - bytesToRead += *(int *)buffer.begin(); + gotHeader = true; + bytesToRead += *(int *)buffer.begin(); } if(gotHeader) { @@ -334,46 +381,65 @@ struct P2PNetworkTest { return Void(); } - ACTOR static Future writeMsg(P2PNetworkTest *self, Reference conn) { + ACTOR static Future writeMsg(P2PNetworkTest *self, Reference conn, bool incoming) { state UnsentPacketQueue packets; - state int msgSize = self->msgBytes.get(); + state int msgSize = (incoming ? self->recvMsgBytes : self->sendMsgBytes).get(); - PacketWriter writer(packets.getWriteBuffer(), nullptr, Unversioned()); + PacketWriter writer(packets.getWriteBuffer(msgSize), nullptr, Unversioned()); writer.serializeBinaryItem(msgSize); writer.serializeBytes(self->msgBuffer.substr(0, msgSize)); - loop { - wait(conn->onWritable()); - wait( delay( 0, TaskPriority::WriteSocket ) ); - int len = conn->write(packets.getUnsent(), self->sendRecvSize); - self->bytesSent += len; - packets.sent(len); + state int bytesBeforeDelay = FLOW_KNOBS->MAX_PACKET_SEND_BYTES; - if(packets.empty()) + loop { + int stutter = self->waitWriteMilliseconds.get(); + if(stutter > 0) { + wait(delay(stutter / 1e3)); + } + + int sent = conn->write(packets.getUnsent(), bytesBeforeDelay); + + if(sent != 0) { + self->bytesSent += sent; + bytesBeforeDelay -= sent; + packets.sent(sent); + } + + if(packets.empty()) { break; + } + + wait(conn->onWritable()); + if(bytesBeforeDelay <= 0) { + wait(delay(0, TaskPriority::WriteSocket)); + bytesBeforeDelay = FLOW_KNOBS->MAX_PACKET_SEND_BYTES; + } } return Void(); } - ACTOR static Future doSession(P2PNetworkTest *self, Reference conn, bool accept) { + ACTOR static Future doSession(P2PNetworkTest *self, Reference conn, bool incoming) { try { - if(accept) { + if(incoming) { wait(conn->acceptHandshake()); } else { wait(conn->connectHandshake()); } - wait(readMsg(self, conn) && writeMsg(self, conn)); + wait(readMsg(self, conn) && writeMsg(self, conn, incoming)); wait(delay(self->idleMilliseconds.get() / 1e3)); conn->close(); - if(accept) { + if(incoming) { ++self->sessionsIn; } else { ++self->sessionsOut; } } catch(Error &e) { - printf("doSession: error %s on remote %s\n", e.what(), conn->getPeerAddress().toString().c_str()); + ++self->sessionErrors; + TraceEvent(SevError, incoming ? "P2PIncomingSessionError" : "P2POutgoingSessionError") + .detail("Remote", conn->getPeerAddress()) + .error(e); } return Void(); @@ -389,7 +455,11 @@ struct P2PNetworkTest { //printf("Connected to %s\n", remote.toString().c_str()); wait(doSession(self, conn, false)); } catch(Error &e) { - printf("outgoing: error %s on remote %s\n", e.what(), remote.toString().c_str()); + ++self->connectErrors; + TraceEvent(SevError, "P2POutgoingError") + .detail("Remote", remote) + .error(e); + wait(delay(1)); } } } @@ -405,7 +475,11 @@ struct P2PNetworkTest { //printf("Connected from %s\n", conn->getPeerAddress().toString().c_str()); sessions.add(doSession(self, conn, true)); } catch(Error &e) { - printf("incoming: error %s on listener %s\n", e.what(), listener->getListenAddress().toString().c_str()); + ++self->acceptErrors; + TraceEvent(SevError, "P2PIncomingError") + .detail("Listener", listener->getListenAddress()) + .error(e); + wait(delay(1)); } } } @@ -416,9 +490,12 @@ struct P2PNetworkTest { self->startTime = now(); printf("%d listeners, %d remotes, %d outgoing connections\n", self->listeners.size(), self->remotes.size(), self->connectionsOut); - printf("Message size %d to %d bytes\n", self->msgBytes.min, self->msgBytes.max); - printf("Post exchange delay %f to %f seconds\n", self->idleMilliseconds.min / 1e3, self->idleMilliseconds.max / 1e3); - printf("Send/Recv size %d bytes\n", self->sendRecvSize); + printf("Send message size: %s\n", self->sendMsgBytes.toString().c_str()); + printf("Receive message size: %s\n", self->recvMsgBytes.toString().c_str()); + printf("Delay before socket read: %s\n", self->waitReadMilliseconds.toString().c_str()); + printf("Delay before socket write: %s\n", self->waitWriteMilliseconds.toString().c_str()); + printf("Delay before session close: %s\n", self->idleMilliseconds.toString().c_str()); + printf("Send/Recv size %d bytes\n", FLOW_KNOBS->MAX_PACKET_SEND_BYTES); for(auto n : self->remotes) { printf("Remote: %s\n", n.toString().c_str()); @@ -436,7 +513,7 @@ struct P2PNetworkTest { } loop { - wait(delay(1.0)); + wait(delay(1.0, TaskPriority::Max)); printf("%s\n", self->statsString().c_str()); } } @@ -460,12 +537,14 @@ std::string getEnvStr(const char *name, std::string defaultValue = "") { // TODO: Remove this hacky thing and make a "networkp2ptest" role in fdbserver TEST_CASE("!p2ptest") { state P2PNetworkTest p2p( - getEnvStr("listenerAddresses", "127.0.0.1:5000,127.0.0.1:5001:tls"), - getEnvStr("remoteAddresses", "127.0.0.1:5000,127.0.0.1:5001:tls"), - getEnvInt("connectionsOut", 2), - {getEnvInt("minMsgBytes", 0), getEnvInt("maxMsgBytes", 1000000)}, - {getEnvInt("minIdleMilliseconds", 500), getEnvInt("maxIdleMilliseconds", 1000)}, - getEnvInt("sendRecvSize", 32000) + getEnvStr("listenerAddresses", "127.0.0.1:5001:tls"), + getEnvStr("remoteAddresses", "127.0.0.1:5001:tls"), + getEnvInt("connectionsOut", 1), + getEnvStr("sendMsgBytes", "1000000000:1000000000"), + getEnvStr("recvMsgBytes", "0:0"), + getEnvStr("idleMilliseconds", "0:0"), + getEnvStr("waitReadMilliseconds", "0:0"), + getEnvStr("waitWriteMilliseconds", "0:0") ); wait(p2p.run()); diff --git a/flow/FastAlloc.cpp b/flow/FastAlloc.cpp index 47846bc76d..e976a949de 100644 --- a/flow/FastAlloc.cpp +++ b/flow/FastAlloc.cpp @@ -527,6 +527,7 @@ void releaseAllThreadMagazines() { FastAllocator<2048>::releaseThreadMagazines(); FastAllocator<4096>::releaseThreadMagazines(); FastAllocator<8192>::releaseThreadMagazines(); + FastAllocator<16384>::releaseThreadMagazines(); } int64_t getTotalUnusedAllocatedMemory() { @@ -543,6 +544,7 @@ int64_t getTotalUnusedAllocatedMemory() { unusedMemory += FastAllocator<2048>::getApproximateMemoryUnused(); unusedMemory += FastAllocator<4096>::getApproximateMemoryUnused(); unusedMemory += FastAllocator<8192>::getApproximateMemoryUnused(); + unusedMemory += FastAllocator<16384>::getApproximateMemoryUnused(); return unusedMemory; } @@ -558,3 +560,4 @@ template class FastAllocator<1024>; template class FastAllocator<2048>; template class FastAllocator<4096>; template class FastAllocator<8192>; +template class FastAllocator<16384>; diff --git a/flow/FastAlloc.h b/flow/FastAlloc.h index 2a730a1367..53d95b4c83 100644 --- a/flow/FastAlloc.h +++ b/flow/FastAlloc.h @@ -216,6 +216,7 @@ public: if (size <= 2048) return FastAllocator<2048>::allocate(); if (size <= 4096) return FastAllocator<4096>::allocate(); if (size <= 8192) return FastAllocator<8192>::allocate(); + if (size <= 16384) return FastAllocator<16384>::allocate(); return new uint8_t[size]; } @@ -231,6 +232,7 @@ inline void freeFast(int size, void* ptr) { if (size <= 2048) return FastAllocator<2048>::release(ptr); if (size <= 4096) return FastAllocator<4096>::release(ptr); if (size <= 8192) return FastAllocator<8192>::release(ptr); + if (size <= 16384) return FastAllocator<16384>::release(ptr); delete[](uint8_t*)ptr; } diff --git a/flow/Net2Packet.cpp b/flow/Net2Packet.cpp index 74ce689be6..80bd7194a2 100644 --- a/flow/Net2Packet.cpp +++ b/flow/Net2Packet.cpp @@ -49,7 +49,7 @@ void PacketWriter::serializeBytesAcrossBoundary(const void* data, int bytes) { if (!bytes) break; data = (uint8_t*)data + b; - nextBuffer(); + nextBuffer(bytes); } } @@ -69,6 +69,8 @@ void PacketWriter::nextBuffer(size_t size) { } } +// Adds exactly bytes of unwritten length to the buffer, possibly across packet buffer boundaries, +// and initializes buf to point to the packet buffer(s) that contain the unwritten space void PacketWriter::writeAhead( int bytes, struct SplitBuffer* buf ) { if (bytes <= buffer->bytes_unwritten()) { buf->begin = buffer->data() + buffer->bytes_written; @@ -79,9 +81,10 @@ void PacketWriter::writeAhead( int bytes, struct SplitBuffer* buf ) { buf->begin = buffer->data() + buffer->bytes_written; buf->first_length = buffer->bytes_unwritten(); buffer->bytes_written = buffer->size(); - nextBuffer(); + size_t remaining = bytes - buf->first_length; + nextBuffer(remaining); buf->next = buffer->data(); - buffer->bytes_written = bytes - buf->first_length; + buffer->bytes_written = remaining; } } @@ -172,7 +175,7 @@ PacketBuffer* ReliablePacketList::compact(PacketBuffer* into, PacketBuffer* end) if (c->buffer == end /*&& c->begin>=c->buffer->bytes_written*/) // quit when we hit the unsent range return into; if (into->bytes_written == into->size()) { - into->next = PacketBuffer::create(); + into->next = PacketBuffer::create(into->size()); into = into->nextPacketBuffer(); } diff --git a/flow/Net2Packet.h b/flow/Net2Packet.h index 3d9a0f99b3..377f839a77 100644 --- a/flow/Net2Packet.h +++ b/flow/Net2Packet.h @@ -44,10 +44,10 @@ public: ~UnsentPacketQueue() { discardAll(); } // Get a PacketBuffer to write new packets into - PacketBuffer* getWriteBuffer() { + PacketBuffer* getWriteBuffer(size_t sizeHint = 0) { if (!unsent_last) { ASSERT(!unsent_first); - unsent_first = unsent_last = PacketBuffer::create(); + unsent_first = unsent_last = PacketBuffer::create(sizeHint); }; return unsent_last; } diff --git a/flow/serialize.h b/flow/serialize.h index 1adc93dfa1..34a2c95b4e 100644 --- a/flow/serialize.h +++ b/flow/serialize.h @@ -669,12 +669,16 @@ struct SendBuffer { uint8_t const* data; SendBuffer* next; int bytes_written, bytes_sent; + int bytes_unsent() const { + return bytes_written - bytes_sent; + } }; struct PacketBuffer : SendBuffer { private: int reference_count; uint32_t size_; + static constexpr size_t PACKET_BUFFER_MIN_SIZE = 16384; static constexpr size_t PACKET_BUFFER_OVERHEAD = 32; public: @@ -691,9 +695,9 @@ private: public: static PacketBuffer* create(size_t size = 0) { - size = std::max(size, 4096 - PACKET_BUFFER_OVERHEAD); - if (size == 4096 - PACKET_BUFFER_OVERHEAD) { - return new (FastAllocator<4096>::allocate()) PacketBuffer{ size }; + size = std::max(size, PACKET_BUFFER_MIN_SIZE - PACKET_BUFFER_OVERHEAD); + if (size == PACKET_BUFFER_MIN_SIZE - PACKET_BUFFER_OVERHEAD) { + return new (FastAllocator::allocate()) PacketBuffer{ size }; } uint8_t* mem = new uint8_t[size + PACKET_BUFFER_OVERHEAD]; return new (mem) PacketBuffer{ size }; @@ -702,8 +706,8 @@ public: void addref() { ++reference_count; } void delref() { if (!--reference_count) { - if (size_ == 4096 - PACKET_BUFFER_OVERHEAD) { - FastAllocator<4096>::release(this); + if (size_ == PACKET_BUFFER_MIN_SIZE - PACKET_BUFFER_OVERHEAD) { + FastAllocator::release(this); } else { delete[] this; }