From 57344922a44a17ab49803ae80300a66f19146778 Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Mon, 22 Jun 2020 03:37:16 -0700 Subject: [PATCH] Added a network connection test and very hacky interface to it. The test simultaneously listens on a set of addresses, connects to a random set of addresses to maintain a configured number of connections, and for each session executes a two way data exchange followed by a random delay and close. --- fdbserver/networktest.actor.cpp | 212 ++++++++++++++++++++++++++++++++ 1 file changed, 212 insertions(+) diff --git a/fdbserver/networktest.actor.cpp b/fdbserver/networktest.actor.cpp index fff160d8d7..1da0341c6e 100644 --- a/fdbserver/networktest.actor.cpp +++ b/fdbserver/networktest.actor.cpp @@ -21,6 +21,9 @@ #include "fdbserver/NetworkTest.h" #include "flow/Knobs.h" #include "flow/actorcompiler.h" // This must be the last #include. +#include "flow/ActorCollection.h" +#include "flow/UnitTest.h" +#include UID WLTOKEN_NETWORKTEST( -1, 2 ); @@ -233,3 +236,212 @@ ACTOR Future networkTestClient( std:: string testServers ) { wait( waitForAll( clients ) ); return Void(); } + +struct RandomIntRange { + int min; + int max; + int get() const { + return nondeterministicRandom()->randomInt(min, max + 1); + } +}; + +struct P2PNetworkTest { + std::vector> listeners; + std::vector remotes; + int connectionsOut; + RandomIntRange msgBytes; + RandomIntRange idleMilliseconds; + + double startTime; + int64_t bytesSent; + int64_t bytesReceived; + int sessionsIn; + int sessionsOut; + Standalone msgBuffer; + + std::string statsString() const { + double elapsed = now() - startTime; + return format("%.2f MB/s bytes in %.2f MB/s bytes out %.2f/s sessions in %.2f/s sessions out", + bytesReceived / elapsed / 1e6, bytesSent / elapsed / 1e6, sessionsIn / elapsed, sessionsOut / elapsed); + } + + P2PNetworkTest() {} + + P2PNetworkTest(std::string listenerAddresses, std::string remoteAddresses, int connectionsOut, RandomIntRange msgBytes, RandomIntRange idleMilliseconds) + : connectionsOut(connectionsOut), msgBytes(msgBytes), idleMilliseconds(idleMilliseconds) { + bytesSent = 0; + bytesReceived = 0; + sessionsIn = 0; + sessionsOut = 0; + msgBuffer = nondeterministicRandom()->randomAlphaNumeric(msgBytes.max); + + if(!remoteAddresses.empty()) { + remotes = NetworkAddress::parseList(remoteAddresses); + } + + if(!listenerAddresses.empty()) { + for(auto a : NetworkAddress::parseList(listenerAddresses)) { + listeners.push_back(INetworkConnections::net()->listen(a)); + } + } + } + + NetworkAddress randomRemote() { + return remotes[nondeterministicRandom()->randomInt(0, remotes.size())]; + } + + ACTOR static Future readMsg(P2PNetworkTest *self, Reference conn) { + state Standalone buffer = makeString(32000); + state int bytesToRead = sizeof(int); + state int writeOffset = 0; + state bool gotHeader = false; + + loop { + int len = conn->read((uint8_t *)buffer.begin() + writeOffset, (uint8_t *)buffer.end()); + bytesToRead -= len; + self->bytesReceived += len; + if(!gotHeader) { + if(bytesToRead <= 0) { + gotHeader = true; + bytesToRead += *(uint64_t *)buffer.begin(); + writeOffset = 0; + } + } else { + if(bytesToRead == 0) { + break; + } + ASSERT(bytesToRead > 0); + writeOffset = 0; + } + + if(len == 0) { + wait(conn->onReadable()); + wait( delay( 0, TaskPriority::ReadSocket ) ); + } + } + + return Void(); + } + + ACTOR static Future writeMsg(P2PNetworkTest *self, Reference conn) { + state UnsentPacketQueue packets; + state int msgSize = self->msgBytes.get(); + + PacketWriter writer(packets.getWriteBuffer(), nullptr, Unversioned()); + writer.serializeBinaryItem(msgSize); + writer.serializeBytes(self->msgBuffer.substr(0, msgSize)); + + loop { + wait(conn->onWritable()); + wait( delay( 0, TaskPriority::WriteSocket ) ); + int len = conn->write(packets.getUnsent(), 32000); + self->bytesSent += len; + packets.sent(len); + + if(packets.empty()) + break; + } + + return Void(); + } + + ACTOR static Future doSession(P2PNetworkTest *self, Reference conn) { + state bool waiting = false; + try { + wait(readMsg(self, conn) && writeMsg(self, conn)); + waiting = true; + wait(delay(self->idleMilliseconds.get() / 1e3)); + conn->close(); + } catch(Error &e) { + if(!waiting) { + printf("Unexpected error: %s on remote %s\n", e.what(), conn->getPeerAddress().toString().c_str()); + throw e; + } + } + + return Void(); + } + + ACTOR static Future outgoing(P2PNetworkTest *self) { + loop { + state NetworkAddress remote = self->randomRemote(); + try { + Reference conn = wait(INetworkConnections::net()->connect(remote)); + //printf("Connected to %s\n", remote.toString().c_str()); + ++self->sessionsOut; + wait(doSession(self, conn)); + } catch(Error &e) { + printf("Unexpected error: %s on remote %s\n", e.what(), remote.toString().c_str()); + } + } + } + + ACTOR static Future incoming(P2PNetworkTest *self, Reference listener) { + state ActorCollection sessions(false); + + loop { + try { + Reference conn = wait(listener->accept()); + //printf("Connected from %s\n", conn->getPeerAddress().toString().c_str()); + ++self->sessionsIn; + sessions.add(doSession(self, conn)); + } catch(Error &e) { + printf("Unexpected error: %s on listener %s\n", e.what(), listener->getListenAddress().toString().c_str()); + } + } + } + + ACTOR static Future run_impl(P2PNetworkTest *self) { + state ActorCollection actors(false); + self->startTime = now(); + + printf("Starting. %d listeners, %d remotes, %d outgoing connections\n", self->listeners.size(), self->remotes.size(), self->connectionsOut); + + for(auto n : self->remotes) { + printf("Remote: %s\n", n.toString().c_str()); + } + + for(auto el : self->listeners) { + printf("Listener: %s\n", el->getListenAddress().toString().c_str()); + actors.add(incoming(self, el)); + } + + for(int i = 0; i < self->connectionsOut; ++i) { + actors.add(outgoing(self)); + } + + loop { + wait(delay(1.0)); + printf("%s\n", self->statsString().c_str()); + } + } + + Future run() { + return run_impl(this); + } + +}; + +int getEnvInt(const char *name, int defaultValue = 0) { + const char *val = getenv(name); + return val != nullptr ? atol(val) : defaultValue; +} + +std::string getEnvStr(const char *name, std::string defaultValue = "") { + const char *val = getenv(name); + return val != nullptr ? val : defaultValue; +} + +// TODO: Remove this hacky thing and make a "networkp2ptest" role in fdbserver +TEST_CASE("!p2ptest") { + state P2PNetworkTest p2p( + getEnvStr("listenerAddresses"), + getEnvStr("remoteAddresses"), + getEnvInt("connectionsOut"), + {getEnvInt("minMsgBytes"), getEnvInt("maxMsgBytes")}, + {getEnvInt("minIdleMilliseconds"), getEnvInt("maxIdleMilliseconds")} + ); + + wait(p2p.run()); + return Void(); +}