Added a network connection test and very hacky interface to it. The test simultaneously listens on a set of addresses, connects to a random set of addresses to maintain a configured number of connections, and for each session executes a two way data exchange followed by a random delay and close.

This commit is contained in:
Steve Atherton 2020-06-22 03:37:16 -07:00
parent d681e9d350
commit 57344922a4
1 changed files with 212 additions and 0 deletions

View File

@ -21,6 +21,9 @@
#include "fdbserver/NetworkTest.h"
#include "flow/Knobs.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/ActorCollection.h"
#include "flow/UnitTest.h"
#include <inttypes.h>
UID WLTOKEN_NETWORKTEST( -1, 2 );
@ -233,3 +236,212 @@ ACTOR Future<Void> networkTestClient( std:: string testServers ) {
wait( waitForAll( clients ) );
return Void();
}
struct RandomIntRange {
int min;
int max;
int get() const {
return nondeterministicRandom()->randomInt(min, max + 1);
}
};
struct P2PNetworkTest {
std::vector<Reference<IListener>> listeners;
std::vector<NetworkAddress> remotes;
int connectionsOut;
RandomIntRange msgBytes;
RandomIntRange idleMilliseconds;
double startTime;
int64_t bytesSent;
int64_t bytesReceived;
int sessionsIn;
int sessionsOut;
Standalone<StringRef> msgBuffer;
std::string statsString() const {
double elapsed = now() - startTime;
return format("%.2f MB/s bytes in %.2f MB/s bytes out %.2f/s sessions in %.2f/s sessions out",
bytesReceived / elapsed / 1e6, bytesSent / elapsed / 1e6, sessionsIn / elapsed, sessionsOut / elapsed);
}
P2PNetworkTest() {}
P2PNetworkTest(std::string listenerAddresses, std::string remoteAddresses, int connectionsOut, RandomIntRange msgBytes, RandomIntRange idleMilliseconds)
: connectionsOut(connectionsOut), msgBytes(msgBytes), idleMilliseconds(idleMilliseconds) {
bytesSent = 0;
bytesReceived = 0;
sessionsIn = 0;
sessionsOut = 0;
msgBuffer = nondeterministicRandom()->randomAlphaNumeric(msgBytes.max);
if(!remoteAddresses.empty()) {
remotes = NetworkAddress::parseList(remoteAddresses);
}
if(!listenerAddresses.empty()) {
for(auto a : NetworkAddress::parseList(listenerAddresses)) {
listeners.push_back(INetworkConnections::net()->listen(a));
}
}
}
NetworkAddress randomRemote() {
return remotes[nondeterministicRandom()->randomInt(0, remotes.size())];
}
ACTOR static Future<Void> readMsg(P2PNetworkTest *self, Reference<IConnection> conn) {
state Standalone<StringRef> buffer = makeString(32000);
state int bytesToRead = sizeof(int);
state int writeOffset = 0;
state bool gotHeader = false;
loop {
int len = conn->read((uint8_t *)buffer.begin() + writeOffset, (uint8_t *)buffer.end());
bytesToRead -= len;
self->bytesReceived += len;
if(!gotHeader) {
if(bytesToRead <= 0) {
gotHeader = true;
bytesToRead += *(uint64_t *)buffer.begin();
writeOffset = 0;
}
} else {
if(bytesToRead == 0) {
break;
}
ASSERT(bytesToRead > 0);
writeOffset = 0;
}
if(len == 0) {
wait(conn->onReadable());
wait( delay( 0, TaskPriority::ReadSocket ) );
}
}
return Void();
}
ACTOR static Future<Void> writeMsg(P2PNetworkTest *self, Reference<IConnection> conn) {
state UnsentPacketQueue packets;
state int msgSize = self->msgBytes.get();
PacketWriter writer(packets.getWriteBuffer(), nullptr, Unversioned());
writer.serializeBinaryItem(msgSize);
writer.serializeBytes(self->msgBuffer.substr(0, msgSize));
loop {
wait(conn->onWritable());
wait( delay( 0, TaskPriority::WriteSocket ) );
int len = conn->write(packets.getUnsent(), 32000);
self->bytesSent += len;
packets.sent(len);
if(packets.empty())
break;
}
return Void();
}
ACTOR static Future<Void> doSession(P2PNetworkTest *self, Reference<IConnection> conn) {
state bool waiting = false;
try {
wait(readMsg(self, conn) && writeMsg(self, conn));
waiting = true;
wait(delay(self->idleMilliseconds.get() / 1e3));
conn->close();
} catch(Error &e) {
if(!waiting) {
printf("Unexpected error: %s on remote %s\n", e.what(), conn->getPeerAddress().toString().c_str());
throw e;
}
}
return Void();
}
ACTOR static Future<Void> outgoing(P2PNetworkTest *self) {
loop {
state NetworkAddress remote = self->randomRemote();
try {
Reference<IConnection> conn = wait(INetworkConnections::net()->connect(remote));
//printf("Connected to %s\n", remote.toString().c_str());
++self->sessionsOut;
wait(doSession(self, conn));
} catch(Error &e) {
printf("Unexpected error: %s on remote %s\n", e.what(), remote.toString().c_str());
}
}
}
ACTOR static Future<Void> incoming(P2PNetworkTest *self, Reference<IListener> listener) {
state ActorCollection sessions(false);
loop {
try {
Reference<IConnection> conn = wait(listener->accept());
//printf("Connected from %s\n", conn->getPeerAddress().toString().c_str());
++self->sessionsIn;
sessions.add(doSession(self, conn));
} catch(Error &e) {
printf("Unexpected error: %s on listener %s\n", e.what(), listener->getListenAddress().toString().c_str());
}
}
}
ACTOR static Future<Void> run_impl(P2PNetworkTest *self) {
state ActorCollection actors(false);
self->startTime = now();
printf("Starting. %d listeners, %d remotes, %d outgoing connections\n", self->listeners.size(), self->remotes.size(), self->connectionsOut);
for(auto n : self->remotes) {
printf("Remote: %s\n", n.toString().c_str());
}
for(auto el : self->listeners) {
printf("Listener: %s\n", el->getListenAddress().toString().c_str());
actors.add(incoming(self, el));
}
for(int i = 0; i < self->connectionsOut; ++i) {
actors.add(outgoing(self));
}
loop {
wait(delay(1.0));
printf("%s\n", self->statsString().c_str());
}
}
Future<Void> run() {
return run_impl(this);
}
};
int getEnvInt(const char *name, int defaultValue = 0) {
const char *val = getenv(name);
return val != nullptr ? atol(val) : defaultValue;
}
std::string getEnvStr(const char *name, std::string defaultValue = "") {
const char *val = getenv(name);
return val != nullptr ? val : defaultValue;
}
// TODO: Remove this hacky thing and make a "networkp2ptest" role in fdbserver
TEST_CASE("!p2ptest") {
state P2PNetworkTest p2p(
getEnvStr("listenerAddresses"),
getEnvStr("remoteAddresses"),
getEnvInt("connectionsOut"),
{getEnvInt("minMsgBytes"), getEnvInt("maxMsgBytes")},
{getEnvInt("minIdleMilliseconds"), getEnvInt("maxIdleMilliseconds")}
);
wait(p2p.run());
return Void();
}