Merge pull request #2678 from sears/networktest_perf
Add some tuning knobs to networktestclient; also, measure latency directly
This commit is contained in:
commit
fe78524bbc
|
@ -24,6 +24,34 @@
|
|||
|
||||
UID WLTOKEN_NETWORKTEST( -1, 2 );
|
||||
|
||||
struct LatencyStats {
|
||||
using sample = double;
|
||||
double x = 0;
|
||||
double x2 = 0;
|
||||
double n = 0;
|
||||
|
||||
sample tick() {
|
||||
// now() returns the timestamp when we were scheduled; count
|
||||
// all that time against this sample.
|
||||
return now();
|
||||
}
|
||||
|
||||
void tock(sample tick) {
|
||||
// time_monotonic returns the timestamp when it was called;
|
||||
// count the time it took us to be dispatched and invoke
|
||||
// timer_monotonic
|
||||
double delta = timer_monotonic() - tick;
|
||||
x += delta;
|
||||
x2 += (delta * delta);
|
||||
n++;
|
||||
}
|
||||
|
||||
void reset() { *this = LatencyStats(); }
|
||||
double count() { return n; }
|
||||
double mean() { return x / n; }
|
||||
double stddev() { return sqrt(x2 / n - (x / n) * (x / n)); }
|
||||
};
|
||||
|
||||
NetworkTestInterface::NetworkTestInterface( NetworkAddress remote )
|
||||
: test( Endpoint({remote}, WLTOKEN_NETWORKTEST) )
|
||||
{
|
||||
|
@ -39,16 +67,24 @@ ACTOR Future<Void> networkTestServer() {
|
|||
state Future<Void> logging = delay( 1.0 );
|
||||
state double lastTime = now();
|
||||
state int sent = 0;
|
||||
state LatencyStats latency;
|
||||
|
||||
loop {
|
||||
choose {
|
||||
when( NetworkTestRequest req = waitNext( interf.test.getFuture() ) ) {
|
||||
LatencyStats::sample sample = latency.tick();
|
||||
req.reply.send( NetworkTestReply( Value( std::string( req.replySize, '.' ) ) ) );
|
||||
latency.tock(sample);
|
||||
sent++;
|
||||
}
|
||||
when( wait( logging ) ) {
|
||||
auto spd = sent / (now() - lastTime);
|
||||
fprintf( stderr, "responses per second: %f (%f us)\n", spd, 1e6/spd );
|
||||
if (FLOW_KNOBS->NETWORK_TEST_SCRIPT_MODE) {
|
||||
fprintf(stderr, "%f\t%.3f\t%.3f\n", spd, latency.mean() * 1e6, latency.stddev() * 1e6);
|
||||
} else {
|
||||
fprintf(stderr, "responses per second: %f (%f us)\n", spd, latency.mean() * 1e6);
|
||||
}
|
||||
latency.reset();
|
||||
lastTime = now();
|
||||
sent = 0;
|
||||
logging = delay( 1.0 );
|
||||
|
@ -57,24 +93,65 @@ ACTOR Future<Void> networkTestServer() {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> testClient( std::vector<NetworkTestInterface> interfs, int* sent ) {
|
||||
loop {
|
||||
NetworkTestReply rep = wait( retryBrokenPromise(interfs[deterministicRandom()->randomInt(0, interfs.size())].test, NetworkTestRequest( LiteralStringRef("."), FLOW_KNOBS->NETWORK_TEST_REPLY_SIZE ) ) );
|
||||
(*sent)++;
|
||||
static bool moreRequestsPending(int count) {
|
||||
if (count == -1) {
|
||||
return false;
|
||||
} else {
|
||||
int request_count = FLOW_KNOBS->NETWORK_TEST_REQUEST_COUNT;
|
||||
return (!request_count) || count < request_count;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> logger( int* sent ) {
|
||||
state double lastTime = now();
|
||||
loop {
|
||||
wait( delay(1.0) );
|
||||
auto spd = *sent / (now() - lastTime);
|
||||
fprintf( stderr, "messages per second: %f\n", spd);
|
||||
lastTime = now();
|
||||
*sent = 0;
|
||||
static bool moreLoggingNeeded(int count, int iteration) {
|
||||
if (FLOW_KNOBS->NETWORK_TEST_SCRIPT_MODE) {
|
||||
return iteration <= 2;
|
||||
} else {
|
||||
return moreRequestsPending(count);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> testClient(std::vector<NetworkTestInterface> interfs, int* sent, int* completed,
|
||||
LatencyStats* latency) {
|
||||
state std::string request_payload(FLOW_KNOBS->NETWORK_TEST_REQUEST_SIZE, '.');
|
||||
state int count = FLOW_KNOBS->NETWORK_TEST_REQUEST_COUNT;
|
||||
state LatencyStats::sample sample;
|
||||
|
||||
while (moreRequestsPending(*sent)) {
|
||||
(*sent)++;
|
||||
sample = latency->tick();
|
||||
NetworkTestReply rep = wait(
|
||||
retryBrokenPromise(interfs[deterministicRandom()->randomInt(0, interfs.size())].test,
|
||||
NetworkTestRequest(StringRef(request_payload), FLOW_KNOBS->NETWORK_TEST_REPLY_SIZE)));
|
||||
latency->tock(sample);
|
||||
(*completed)++;
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> logger(int* sent, int* completed, LatencyStats* latency) {
|
||||
state double lastTime = now();
|
||||
state int logged = 0;
|
||||
state int iteration = 0;
|
||||
while (moreLoggingNeeded(logged, ++iteration)) {
|
||||
wait( delay(1.0) );
|
||||
auto spd = (*completed - logged) / (now() - lastTime);
|
||||
if (FLOW_KNOBS->NETWORK_TEST_SCRIPT_MODE) {
|
||||
if (iteration == 2) {
|
||||
// We don't report the first iteration because of warm-up effects.
|
||||
printf("%f\t%.3f\t%.3f\n", spd, latency->mean() * 1e6, latency->stddev() * 1e6);
|
||||
}
|
||||
} else {
|
||||
fprintf(stderr, "messages per second: %f (%6.3f us)\n", spd, latency->mean() * 1e6);
|
||||
}
|
||||
latency->reset();
|
||||
lastTime = now();
|
||||
logged = *completed;
|
||||
}
|
||||
// tell the clients to shut down
|
||||
*sent = -1;
|
||||
return Void();
|
||||
}
|
||||
|
||||
static void networkTestnanosleep()
|
||||
{
|
||||
printf("nanosleep speed test\n");
|
||||
|
@ -104,7 +181,6 @@ static void networkTestnanosleep()
|
|||
tv.tv_nsec = 10;
|
||||
nanosleep(&tv, NULL);
|
||||
double after = timer_monotonic();
|
||||
|
||||
printf(" %0.3lf", (after - before)*1e6);
|
||||
}
|
||||
|
||||
|
@ -139,19 +215,21 @@ ACTOR Future<Void> networkTestClient( std:: string testServers ) {
|
|||
//return Void();
|
||||
}
|
||||
|
||||
|
||||
state std::vector<NetworkTestInterface> interfs;
|
||||
state std::vector<NetworkAddress> servers = NetworkAddress::parseList(testServers);
|
||||
state int sent = 0;
|
||||
state int completed = 0;
|
||||
state LatencyStats latency;
|
||||
|
||||
for( int i = 0; i < servers.size(); i++ ) {
|
||||
interfs.push_back( NetworkTestInterface( servers[i] ) );
|
||||
}
|
||||
|
||||
state std::vector<Future<Void>> clients;
|
||||
for( int i = 0; i < 30; i++ )
|
||||
clients.push_back( testClient( interfs, &sent ) );
|
||||
clients.push_back( logger( &sent ) );
|
||||
for (int i = 0; i < FLOW_KNOBS->NETWORK_TEST_CLIENT_COUNT; i++) {
|
||||
clients.push_back(testClient(interfs, &sent, &completed, &latency));
|
||||
}
|
||||
clients.push_back(logger(&sent, &completed, &latency));
|
||||
|
||||
wait( waitForAll( clients ) );
|
||||
return Void();
|
||||
|
|
|
@ -79,7 +79,11 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
|
|||
init( TLS_SERVER_CONNECTION_THROTTLE_ATTEMPTS, 1 );
|
||||
init( TLS_CLIENT_CONNECTION_THROTTLE_ATTEMPTS, 0 );
|
||||
|
||||
init( NETWORK_TEST_CLIENT_COUNT, 30 );
|
||||
init( NETWORK_TEST_REPLY_SIZE, 600e3 );
|
||||
init( NETWORK_TEST_REQUEST_COUNT, 0 ); // 0 -> run forever
|
||||
init( NETWORK_TEST_REQUEST_SIZE, 1 );
|
||||
init( NETWORK_TEST_SCRIPT_MODE, false );
|
||||
|
||||
//AsyncFileCached
|
||||
init( PAGE_CACHE_4K, 2LL<<30 );
|
||||
|
|
|
@ -95,8 +95,12 @@ public:
|
|||
int TLS_SERVER_CONNECTION_THROTTLE_ATTEMPTS;
|
||||
int TLS_CLIENT_CONNECTION_THROTTLE_ATTEMPTS;
|
||||
|
||||
int NETWORK_TEST_CLIENT_COUNT;
|
||||
int NETWORK_TEST_REPLY_SIZE;
|
||||
|
||||
int NETWORK_TEST_REQUEST_COUNT;
|
||||
int NETWORK_TEST_REQUEST_SIZE;
|
||||
bool NETWORK_TEST_SCRIPT_MODE;
|
||||
|
||||
//AsyncFileCached
|
||||
int64_t PAGE_CACHE_4K;
|
||||
int64_t PAGE_CACHE_64K;
|
||||
|
|
Loading…
Reference in New Issue