foundationdb/fdbserver/networktest.actor.cpp

474 lines
13 KiB
C++
Raw Normal View History

2017-05-26 04:48:44 +08:00
/*
* networktest.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
2017-05-26 04:48:44 +08:00
* http://www.apache.org/licenses/LICENSE-2.0
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/NetworkTest.h"
#include "flow/Knobs.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/ActorCollection.h"
#include "flow/UnitTest.h"
#include <inttypes.h>
2017-05-26 04:48:44 +08:00
UID WLTOKEN_NETWORKTEST( -1, 2 );
2020-02-20 02:55:05 +08:00
struct LatencyStats {
using sample = double;
double x = 0;
double x2 = 0;
double n = 0;
sample tick() {
// now() returns the timestamp when we were scheduled; count
// all that time against this sample.
return now();
}
void tock(sample tick) {
// time_monotonic returns the timestamp when it was called;
// count the time it took us to be dispatched and invoke
// timer_monotonic
double delta = timer_monotonic() - tick;
x += delta;
x2 += (delta * delta);
n++;
}
2020-02-20 02:55:05 +08:00
void reset() { *this = LatencyStats(); }
double count() { return n; }
double mean() { return x / n; }
double stddev() { return sqrt(x2 / n - (x / n) * (x / n)); }
};
2017-05-26 04:48:44 +08:00
NetworkTestInterface::NetworkTestInterface( NetworkAddress remote )
: test( Endpoint({remote}, WLTOKEN_NETWORKTEST) )
2017-05-26 04:48:44 +08:00
{
}
NetworkTestInterface::NetworkTestInterface( INetwork* local )
{
test.makeWellKnownEndpoint( WLTOKEN_NETWORKTEST, TaskPriority::DefaultEndpoint );
2017-05-26 04:48:44 +08:00
}
ACTOR Future<Void> networkTestServer() {
state NetworkTestInterface interf( g_network );
state Future<Void> logging = delay( 1.0 );
state double lastTime = now();
state int sent = 0;
2020-02-20 02:55:05 +08:00
state LatencyStats latency;
2017-05-26 04:48:44 +08:00
loop {
choose {
when( NetworkTestRequest req = waitNext( interf.test.getFuture() ) ) {
2020-02-20 02:55:05 +08:00
LatencyStats::sample sample = latency.tick();
2017-05-26 04:48:44 +08:00
req.reply.send( NetworkTestReply( Value( std::string( req.replySize, '.' ) ) ) );
latency.tock(sample);
2017-05-26 04:48:44 +08:00
sent++;
}
when( wait( logging ) ) {
2017-05-26 04:48:44 +08:00
auto spd = sent / (now() - lastTime);
if (FLOW_KNOBS->NETWORK_TEST_SCRIPT_MODE) {
2020-02-20 02:55:05 +08:00
fprintf(stderr, "%f\t%.3f\t%.3f\n", spd, latency.mean() * 1e6, latency.stddev() * 1e6);
} else {
2020-02-20 02:55:05 +08:00
fprintf(stderr, "responses per second: %f (%f us)\n", spd, latency.mean() * 1e6);
}
latency.reset();
2017-05-26 04:48:44 +08:00
lastTime = now();
sent = 0;
logging = delay( 1.0 );
}
}
}
}
2020-02-20 02:55:05 +08:00
static bool moreRequestsPending(int count) {
if (count == -1) {
return false;
} else {
int request_count = FLOW_KNOBS->NETWORK_TEST_REQUEST_COUNT;
return (!request_count) || count < request_count;
}
}
2020-02-20 02:55:05 +08:00
static bool moreLoggingNeeded(int count, int iteration) {
if (FLOW_KNOBS->NETWORK_TEST_SCRIPT_MODE) {
return iteration <= 2;
} else {
2020-02-20 02:55:05 +08:00
return moreRequestsPending(count);
}
}
2020-02-20 02:55:05 +08:00
ACTOR Future<Void> testClient(std::vector<NetworkTestInterface> interfs, int* sent, int* completed,
LatencyStats* latency) {
state std::string request_payload(FLOW_KNOBS->NETWORK_TEST_REQUEST_SIZE, '.');
state LatencyStats::sample sample;
2020-02-20 02:55:05 +08:00
while (moreRequestsPending(*sent)) {
2017-05-26 04:48:44 +08:00
(*sent)++;
sample = latency->tick();
2020-02-20 02:55:05 +08:00
NetworkTestReply rep = wait(
retryBrokenPromise(interfs[deterministicRandom()->randomInt(0, interfs.size())].test,
NetworkTestRequest(StringRef(request_payload), FLOW_KNOBS->NETWORK_TEST_REPLY_SIZE)));
latency->tock(sample);
(*completed)++;
2017-05-26 04:48:44 +08:00
}
return Void();
2017-05-26 04:48:44 +08:00
}
2020-02-20 02:55:05 +08:00
ACTOR Future<Void> logger(int* sent, int* completed, LatencyStats* latency) {
2017-05-26 04:48:44 +08:00
state double lastTime = now();
state int logged = 0;
state int iteration = 0;
2020-02-20 02:55:05 +08:00
while (moreLoggingNeeded(logged, ++iteration)) {
wait( delay(1.0) );
2020-02-20 02:55:05 +08:00
auto spd = (*completed - logged) / (now() - lastTime);
if (FLOW_KNOBS->NETWORK_TEST_SCRIPT_MODE) {
if (iteration == 2) {
// We don't report the first iteration because of warm-up effects.
2020-02-20 02:55:05 +08:00
printf("%f\t%.3f\t%.3f\n", spd, latency->mean() * 1e6, latency->stddev() * 1e6);
}
} else {
2020-02-20 02:55:05 +08:00
fprintf(stderr, "messages per second: %f (%6.3f us)\n", spd, latency->mean() * 1e6);
}
latency->reset();
2017-05-26 04:48:44 +08:00
lastTime = now();
logged = *completed;
2017-05-26 04:48:44 +08:00
}
// tell the clients to shut down
*sent = -1;
return Void();
2017-05-26 04:48:44 +08:00
}
static void networkTestnanosleep()
{
printf("nanosleep speed test\n");
#ifdef __linux__
printf("\nnanosleep(10) latencies:");
for (int i = 0; i < 10; i++) {
double before = timer_monotonic();
timespec tv;
tv.tv_sec = 0;
tv.tv_nsec = 10;
nanosleep(&tv, NULL);
double after = timer_monotonic();
printf(" %0.3lf", (after - before)*1e6);
}
printf("\nnanosleep(10) latency after 5ms spin:");
for (int i = 0; i < 10; i++) {
double a = timer_monotonic() + 5e-3;
2018-04-07 07:50:58 +08:00
while (timer_monotonic() < a) {}
2017-05-26 04:48:44 +08:00
double before = timer_monotonic();
timespec tv;
tv.tv_sec = 0;
tv.tv_nsec = 10;
nanosleep(&tv, NULL);
double after = timer_monotonic();
printf(" %0.3lf", (after - before)*1e6);
}
printf("\nnanosleep(20000) latency:");
for (int i = 0; i < 10; i++) {
double before = timer_monotonic();
timespec tv;
tv.tv_sec = 0;
tv.tv_nsec = 20000;
nanosleep(&tv, NULL);
double after = timer_monotonic();
printf(" %0.3lf", (after - before)*1e6);
}
printf("\n");
printf("nanosleep(20000) loop\n");
while (true) {
timespec tv;
tv.tv_sec = 0;
tv.tv_nsec = 20000;
nanosleep(&tv, NULL);
}
#endif
return;
}
ACTOR Future<Void> networkTestClient( std:: string testServers ) {
if (testServers == "nanosleep") {
networkTestnanosleep();
//return Void();
}
state std::vector<NetworkTestInterface> interfs;
state std::vector<NetworkAddress> servers = NetworkAddress::parseList(testServers);
state int sent = 0;
state int completed = 0;
2020-02-20 02:55:05 +08:00
state LatencyStats latency;
2017-05-26 04:48:44 +08:00
for( int i = 0; i < servers.size(); i++ ) {
interfs.push_back( NetworkTestInterface( servers[i] ) );
}
state std::vector<Future<Void>> clients;
2020-02-20 02:55:05 +08:00
for (int i = 0; i < FLOW_KNOBS->NETWORK_TEST_CLIENT_COUNT; i++) {
clients.push_back(testClient(interfs, &sent, &completed, &latency));
}
2020-02-20 02:55:05 +08:00
clients.push_back(logger(&sent, &completed, &latency));
2017-05-26 04:48:44 +08:00
wait( waitForAll( clients ) );
2017-05-26 04:48:44 +08:00
return Void();
2018-04-07 07:50:58 +08:00
}
struct RandomIntRange {
int min;
int max;
int get() const {
return nondeterministicRandom()->randomInt(min, max + 1);
}
};
struct P2PNetworkTest {
std::vector<Reference<IListener>> listeners;
std::vector<NetworkAddress> remotes;
int connectionsOut;
RandomIntRange msgBytes;
RandomIntRange idleMilliseconds;
double startTime;
int64_t bytesSent;
int64_t bytesReceived;
int sessionsIn;
int sessionsOut;
Standalone<StringRef> msgBuffer;
int sendRecvSize;
2020-06-23 04:37:13 +08:00
std::string statsString() {
double elapsed = now() - startTime;
2020-06-23 04:37:13 +08:00
std::string s = format("%.2f MB/s bytes in %.2f MB/s bytes out %.2f/s completed sessions in %.2f/s completed sessions out",
bytesReceived / elapsed / 1e6, bytesSent / elapsed / 1e6, sessionsIn / elapsed, sessionsOut / elapsed);
2020-06-23 04:37:13 +08:00
bytesSent = 0;
bytesReceived = 0;
sessionsIn = 0;
sessionsOut = 0;
startTime = now();
return s;
}
P2PNetworkTest() {}
P2PNetworkTest(std::string listenerAddresses, std::string remoteAddresses, int connectionsOut, RandomIntRange msgBytes, RandomIntRange idleMilliseconds, int sendRecvSize)
: connectionsOut(connectionsOut), msgBytes(msgBytes), idleMilliseconds(idleMilliseconds), sendRecvSize(sendRecvSize) {
bytesSent = 0;
bytesReceived = 0;
sessionsIn = 0;
sessionsOut = 0;
msgBuffer = nondeterministicRandom()->randomAlphaNumeric(msgBytes.max);
if(!remoteAddresses.empty()) {
remotes = NetworkAddress::parseList(remoteAddresses);
}
if(!listenerAddresses.empty()) {
for(auto a : NetworkAddress::parseList(listenerAddresses)) {
listeners.push_back(INetworkConnections::net()->listen(a));
}
}
}
NetworkAddress randomRemote() {
return remotes[nondeterministicRandom()->randomInt(0, remotes.size())];
}
ACTOR static Future<Void> readMsg(P2PNetworkTest *self, Reference<IConnection> conn) {
state Standalone<StringRef> buffer = makeString(self->sendRecvSize);
state int bytesToRead = sizeof(int);
state int writeOffset = 0;
state bool gotHeader = false;
// Fill buffer sequentially until the initial bytesToRead is read (or more), then read
// intended message size and add it to bytesToRead, continue if needed until bytesToRead is 0.
loop {
int len = conn->read((uint8_t *)buffer.begin() + writeOffset, (uint8_t *)buffer.end());
bytesToRead -= len;
self->bytesReceived += len;
writeOffset += len;
// If no size header yet but there are enough bytes, read the size
if(!gotHeader && bytesToRead <= 0) {
gotHeader = true;
bytesToRead += *(int *)buffer.begin();
}
if(gotHeader) {
if(bytesToRead == 0) {
break;
}
ASSERT(bytesToRead > 0);
writeOffset = 0;
}
if(len == 0) {
wait(conn->onReadable());
wait( delay( 0, TaskPriority::ReadSocket ) );
}
}
return Void();
}
ACTOR static Future<Void> writeMsg(P2PNetworkTest *self, Reference<IConnection> conn) {
state UnsentPacketQueue packets;
state int msgSize = self->msgBytes.get();
PacketWriter writer(packets.getWriteBuffer(), nullptr, Unversioned());
writer.serializeBinaryItem(msgSize);
writer.serializeBytes(self->msgBuffer.substr(0, msgSize));
loop {
wait(conn->onWritable());
wait( delay( 0, TaskPriority::WriteSocket ) );
int len = conn->write(packets.getUnsent(), self->sendRecvSize);
self->bytesSent += len;
packets.sent(len);
if(packets.empty())
break;
}
return Void();
}
2020-06-23 04:37:13 +08:00
ACTOR static Future<Void> doSession(P2PNetworkTest *self, Reference<IConnection> conn, bool accept) {
try {
2020-06-23 04:37:13 +08:00
if(accept) {
wait(conn->acceptHandshake());
} else {
wait(conn->connectHandshake());
}
wait(readMsg(self, conn) && writeMsg(self, conn));
wait(delay(self->idleMilliseconds.get() / 1e3));
conn->close();
if(accept) {
++self->sessionsIn;
} else {
++self->sessionsOut;
}
} catch(Error &e) {
printf("doSession: error %s on remote %s\n", e.what(), conn->getPeerAddress().toString().c_str());
}
return Void();
}
ACTOR static Future<Void> outgoing(P2PNetworkTest *self) {
loop {
wait(delay(0, TaskPriority::WriteSocket));
state NetworkAddress remote = self->randomRemote();
try {
state Reference<IConnection> conn = wait(INetworkConnections::net()->connect(remote));
//printf("Connected to %s\n", remote.toString().c_str());
2020-06-23 04:37:13 +08:00
wait(doSession(self, conn, false));
} catch(Error &e) {
printf("outgoing: error %s on remote %s\n", e.what(), remote.toString().c_str());
}
}
}
ACTOR static Future<Void> incoming(P2PNetworkTest *self, Reference<IListener> listener) {
state ActorCollection sessions(false);
loop {
wait(delay(0, TaskPriority::AcceptSocket));
try {
state Reference<IConnection> conn = wait(listener->accept());
//printf("Connected from %s\n", conn->getPeerAddress().toString().c_str());
2020-06-23 04:37:13 +08:00
sessions.add(doSession(self, conn, true));
} catch(Error &e) {
printf("incoming: error %s on listener %s\n", e.what(), listener->getListenAddress().toString().c_str());
}
}
}
ACTOR static Future<Void> run_impl(P2PNetworkTest *self) {
state ActorCollection actors(false);
self->startTime = now();
printf("%d listeners, %d remotes, %d outgoing connections\n", self->listeners.size(), self->remotes.size(), self->connectionsOut);
printf("Message size %d to %d bytes\n", self->msgBytes.min, self->msgBytes.max);
printf("Post exchange delay %f to %f seconds\n", self->idleMilliseconds.min / 1e3, self->idleMilliseconds.max / 1e3);
printf("Send/Recv size %d bytes\n", self->sendRecvSize);
for(auto n : self->remotes) {
printf("Remote: %s\n", n.toString().c_str());
}
for(auto el : self->listeners) {
printf("Listener: %s\n", el->getListenAddress().toString().c_str());
actors.add(incoming(self, el));
}
if(!self->remotes.empty()) {
for(int i = 0; i < self->connectionsOut; ++i) {
actors.add(outgoing(self));
}
}
loop {
wait(delay(1.0));
printf("%s\n", self->statsString().c_str());
}
}
Future<Void> run() {
return run_impl(this);
}
};
int getEnvInt(const char *name, int defaultValue = 0) {
const char *val = getenv(name);
return val != nullptr ? atol(val) : defaultValue;
}
std::string getEnvStr(const char *name, std::string defaultValue = "") {
const char *val = getenv(name);
return val != nullptr ? val : defaultValue;
}
// TODO: Remove this hacky thing and make a "networkp2ptest" role in fdbserver
TEST_CASE("!p2ptest") {
state P2PNetworkTest p2p(
getEnvStr("listenerAddresses", "127.0.0.1:5000,127.0.0.1:5001:tls"),
getEnvStr("remoteAddresses", "127.0.0.1:5000,127.0.0.1:5001:tls"),
getEnvInt("connectionsOut", 2),
{getEnvInt("minMsgBytes", 0), getEnvInt("maxMsgBytes", 1000000)},
{getEnvInt("minIdleMilliseconds", 500), getEnvInt("maxIdleMilliseconds", 1000)},
getEnvInt("sendRecvSize", 32000)
);
wait(p2p.run());
return Void();
}