From 7724c644e5546af533727cafd49576e11297bc90 Mon Sep 17 00:00:00 2001
From: Russell Sears <russell_sears@apple.com>
Date: Mon, 10 Feb 2020 15:03:35 -0800
Subject: [PATCH 1/2] Add some tuning knobs to networktestclient; also, measure
 latency directly.

---
 fdbserver/networktest.actor.cpp | 122 +++++++++++++++++++++++++++-----
 flow/Knobs.cpp                  |   4 ++
 flow/Knobs.h                    |   4 ++
 3 files changed, 112 insertions(+), 18 deletions(-)

diff --git a/fdbserver/networktest.actor.cpp b/fdbserver/networktest.actor.cpp
index 413499e50b..4a301b8aa4 100644
--- a/fdbserver/networktest.actor.cpp
+++ b/fdbserver/networktest.actor.cpp
@@ -24,6 +24,44 @@
 
 UID WLTOKEN_NETWORKTEST( -1, 2 );
 
+struct latency_stats {
+	using sample = double;
+	double x = 0;
+	double x2 = 0;
+	double n = 0;
+
+	sample tick() {
+		// now() returns the timestamp when we were scheduled; count
+		// all that time against this sample.
+		return now();
+	}
+
+	void tock(sample tick) {
+		// time_monotonic returns the timestamp when it was called;
+		// count the time it took us to be dispatched and invoke
+		// timer_monotonic
+		double delta = timer_monotonic() - tick;
+		x += delta;
+		x2 += (delta * delta);
+		n++;
+	}
+
+	void reset() {
+		*this = latency_stats();
+	}
+
+	double count() {
+		return n;
+	}
+
+	double mean() {
+		return x/n;
+	}
+	double stddev() {
+		return sqrt(x2/n - (x/n)*(x/n));
+	}
+};
+
 NetworkTestInterface::NetworkTestInterface( NetworkAddress remote )
 	: test( Endpoint({remote}, WLTOKEN_NETWORKTEST) )
 {
@@ -39,16 +77,25 @@ ACTOR Future<Void> networkTestServer() {
 	state Future<Void> logging = delay( 1.0 );
 	state double lastTime = now();
 	state int sent = 0;
+	state latency_stats latency;
+	state latency_stats::sample sample;
 
 	loop {
 		choose {
 			when( NetworkTestRequest req = waitNext( interf.test.getFuture() ) ) {
+				sample = latency.tick();
 				req.reply.send( NetworkTestReply( Value( std::string( req.replySize, '.' ) ) ) );
+				latency.tock(sample);
 				sent++;
 			}
 			when( wait( logging ) ) {
 				auto spd = sent / (now() - lastTime);
-				fprintf( stderr, "responses per second: %f (%f us)\n", spd, 1e6/spd );
+				if (FLOW_KNOBS->NETWORK_TEST_SCRIPT_MODE) {
+					fprintf( stderr, "%f\t%.3f\t%.3f\n", spd, latency.mean() * 1e6, latency.stddev() * 1e6 );
+				} else {
+					fprintf( stderr, "responses per second: %f (%f us)\n", spd, latency.mean() * 1e6 );
+				}
+				latency.reset();
 				lastTime = now();
 				sent = 0;
 				logging = delay( 1.0 );
@@ -57,24 +104,62 @@ ACTOR Future<Void> networkTestServer() {
 	}
 }
 
-ACTOR Future<Void> testClient( std::vector<NetworkTestInterface> interfs, int* sent ) {
-	loop {
-		NetworkTestReply rep = wait(  retryBrokenPromise(interfs[deterministicRandom()->randomInt(0, interfs.size())].test, NetworkTestRequest( LiteralStringRef("."), FLOW_KNOBS->NETWORK_TEST_REPLY_SIZE ) ) );
-		(*sent)++;
+static bool more_requests_pending(int count) {
+	if (count == -1) {
+		return false;
+	} else {
+		int request_count = FLOW_KNOBS->NETWORK_TEST_REQUEST_COUNT;
+		return (!request_count) || count < request_count;
 	}
 }
 
-ACTOR Future<Void> logger( int* sent ) {
-	state double lastTime = now();
-	loop {
-		wait( delay(1.0) );
-		auto spd = *sent / (now() - lastTime);
-		fprintf( stderr, "messages per second: %f\n", spd);
-		lastTime = now();
-		*sent = 0;
+static bool more_logging_needed(int count, int iteration) {
+	if (FLOW_KNOBS->NETWORK_TEST_SCRIPT_MODE) {
+		return iteration <= 2;
+	} else {
+		return more_requests_pending(count);
 	}
 }
 
+ACTOR Future<Void> testClient( std::vector<NetworkTestInterface> interfs, int* sent, int* completed, latency_stats * latency ) {
+	state std::string request_payload( FLOW_KNOBS->NETWORK_TEST_REQUEST_SIZE, '.');
+	state int count = FLOW_KNOBS->NETWORK_TEST_REQUEST_COUNT;
+	state latency_stats::sample sample;
+
+	while (more_requests_pending(*sent)) {
+		(*sent)++;
+		sample = latency->tick();
+		NetworkTestReply rep = wait(  retryBrokenPromise(interfs[deterministicRandom()->randomInt(0, interfs.size())].test, NetworkTestRequest( StringRef(request_payload), FLOW_KNOBS->NETWORK_TEST_REPLY_SIZE ) ) );
+		latency->tock(sample);
+		(*completed)++;
+	}
+	return Void();
+}
+
+ACTOR Future<Void> logger( int * sent, int* completed, latency_stats * latency ) {
+	state double lastTime = now();
+	state int logged = 0;
+	state int iteration = 0;
+	while (more_logging_needed(logged, ++iteration)) {
+		wait( delay(1.0) );
+		auto spd = (*completed-logged) / (now() - lastTime);
+		if (FLOW_KNOBS->NETWORK_TEST_SCRIPT_MODE) {
+			if (iteration == 2) {
+				// We don't report the first iteration because of warm-up effects.
+				printf( "%f\t%.3f\t%.3f\n", spd, latency->mean() * 1e6, latency->stddev() * 1e6);
+			}
+		} else {
+			fprintf( stderr, "messages per second: %f (%6.3f us)\n", spd, latency->mean() * 1e6);
+		}
+		latency->reset();
+		lastTime = now();
+		logged = *completed;
+	}
+	// tell the clients to shut down
+	*sent = -1;
+	return Void();
+}
+
 static void networkTestnanosleep()
 {
 	printf("nanosleep speed test\n");
@@ -104,7 +189,6 @@ static void networkTestnanosleep()
 		tv.tv_nsec = 10;
 		nanosleep(&tv, NULL);
 		double after = timer_monotonic();
-
 		printf(" %0.3lf", (after - before)*1e6);
 	}
 
@@ -139,19 +223,21 @@ ACTOR Future<Void> networkTestClient( std:: string testServers ) {
 		//return Void();
 	}
 
-
 	state std::vector<NetworkTestInterface> interfs;
 	state std::vector<NetworkAddress> servers = NetworkAddress::parseList(testServers);
 	state int sent = 0;
+	state int completed = 0;
+	state latency_stats latency;
 
 	for( int i = 0; i < servers.size(); i++ ) {
 		interfs.push_back( NetworkTestInterface( servers[i] ) );
 	}
 
 	state std::vector<Future<Void>> clients;
-	for( int i = 0; i < 30; i++ )
-		clients.push_back( testClient( interfs, &sent ) );
-	clients.push_back( logger( &sent ) );
+	for( int i = 0; i < FLOW_KNOBS->NETWORK_TEST_CLIENT_COUNT; i++ ) {
+		clients.push_back( testClient( interfs, &sent, &completed, &latency ) );
+	}
+	clients.push_back( logger( &sent, &completed, &latency ) );
 
 	wait( waitForAll( clients ) );
 	return Void();
diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp
index ed45ce7b83..80833e218e 100644
--- a/flow/Knobs.cpp
+++ b/flow/Knobs.cpp
@@ -80,7 +80,11 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
 	init( TLS_SERVER_CONNECTION_THROTTLE_ATTEMPTS,               1 );
 	init( TLS_CLIENT_CONNECTION_THROTTLE_ATTEMPTS,               0 );
 
+	init( NETWORK_TEST_CLIENT_COUNT,                            30 );
 	init( NETWORK_TEST_REPLY_SIZE,                           600e3 );
+	init( NETWORK_TEST_REQUEST_COUNT,                            0 ); // 0 -> run forever
+	init( NETWORK_TEST_REQUEST_SIZE,                             1 );
+	init( NETWORK_TEST_SCRIPT_MODE,                          false );
 
 	//AsyncFileCached
 	init( PAGE_CACHE_4K,                                   2LL<<30 );
diff --git a/flow/Knobs.h b/flow/Knobs.h
index a02fca7e15..77f2000241 100644
--- a/flow/Knobs.h
+++ b/flow/Knobs.h
@@ -96,7 +96,11 @@ public:
 	int TLS_SERVER_CONNECTION_THROTTLE_ATTEMPTS;
 	int TLS_CLIENT_CONNECTION_THROTTLE_ATTEMPTS;
 
+	int NETWORK_TEST_CLIENT_COUNT;
 	int NETWORK_TEST_REPLY_SIZE;
+	int NETWORK_TEST_REQUEST_COUNT;
+	int NETWORK_TEST_REQUEST_SIZE;
+	bool NETWORK_TEST_SCRIPT_MODE;
 	
 	//AsyncFileCached
 	int64_t PAGE_CACHE_4K;

From 956a3efa8005e0966bcd3c67a2390980f8bf26f3 Mon Sep 17 00:00:00 2001
From: Russell Sears <russell_sears@apple.com>
Date: Wed, 19 Feb 2020 10:55:05 -0800
Subject: [PATCH 2/2] Pull request comments

---
 fdbserver/networktest.actor.cpp | 66 +++++++++++++++------------------
 flow/Knobs.h                    |  2 +-
 2 files changed, 30 insertions(+), 38 deletions(-)

diff --git a/fdbserver/networktest.actor.cpp b/fdbserver/networktest.actor.cpp
index 4a301b8aa4..feb59e9df6 100644
--- a/fdbserver/networktest.actor.cpp
+++ b/fdbserver/networktest.actor.cpp
@@ -24,7 +24,7 @@
 
 UID WLTOKEN_NETWORKTEST( -1, 2 );
 
-struct latency_stats {
+struct LatencyStats {
 	using sample = double;
 	double x = 0;
 	double x2 = 0;
@@ -46,20 +46,10 @@ struct latency_stats {
 		n++;
 	}
 
-	void reset() {
-		*this = latency_stats();
-	}
-
-	double count() {
-		return n;
-	}
-
-	double mean() {
-		return x/n;
-	}
-	double stddev() {
-		return sqrt(x2/n - (x/n)*(x/n));
-	}
+	void reset() { *this = LatencyStats(); }
+	double count() { return n; }
+	double mean() { return x / n; }
+	double stddev() { return sqrt(x2 / n - (x / n) * (x / n)); }
 };
 
 NetworkTestInterface::NetworkTestInterface( NetworkAddress remote )
@@ -77,13 +67,12 @@ ACTOR Future<Void> networkTestServer() {
 	state Future<Void> logging = delay( 1.0 );
 	state double lastTime = now();
 	state int sent = 0;
-	state latency_stats latency;
-	state latency_stats::sample sample;
+	state LatencyStats latency;
 
 	loop {
 		choose {
 			when( NetworkTestRequest req = waitNext( interf.test.getFuture() ) ) {
-				sample = latency.tick();
+				LatencyStats::sample sample = latency.tick();
 				req.reply.send( NetworkTestReply( Value( std::string( req.replySize, '.' ) ) ) );
 				latency.tock(sample);
 				sent++;
@@ -91,9 +80,9 @@ ACTOR Future<Void> networkTestServer() {
 			when( wait( logging ) ) {
 				auto spd = sent / (now() - lastTime);
 				if (FLOW_KNOBS->NETWORK_TEST_SCRIPT_MODE) {
-					fprintf( stderr, "%f\t%.3f\t%.3f\n", spd, latency.mean() * 1e6, latency.stddev() * 1e6 );
+					fprintf(stderr, "%f\t%.3f\t%.3f\n", spd, latency.mean() * 1e6, latency.stddev() * 1e6);
 				} else {
-					fprintf( stderr, "responses per second: %f (%f us)\n", spd, latency.mean() * 1e6 );
+					fprintf(stderr, "responses per second: %f (%f us)\n", spd, latency.mean() * 1e6);
 				}
 				latency.reset();
 				lastTime = now();
@@ -104,7 +93,7 @@ ACTOR Future<Void> networkTestServer() {
 	}
 }
 
-static bool more_requests_pending(int count) {
+static bool moreRequestsPending(int count) {
 	if (count == -1) {
 		return false;
 	} else {
@@ -113,43 +102,46 @@ static bool more_requests_pending(int count) {
 	}
 }
 
-static bool more_logging_needed(int count, int iteration) {
+static bool moreLoggingNeeded(int count, int iteration) {
 	if (FLOW_KNOBS->NETWORK_TEST_SCRIPT_MODE) {
 		return iteration <= 2;
 	} else {
-		return more_requests_pending(count);
+		return moreRequestsPending(count);
 	}
 }
 
-ACTOR Future<Void> testClient( std::vector<NetworkTestInterface> interfs, int* sent, int* completed, latency_stats * latency ) {
-	state std::string request_payload( FLOW_KNOBS->NETWORK_TEST_REQUEST_SIZE, '.');
+ACTOR Future<Void> testClient(std::vector<NetworkTestInterface> interfs, int* sent, int* completed,
+                              LatencyStats* latency) {
+	state std::string request_payload(FLOW_KNOBS->NETWORK_TEST_REQUEST_SIZE, '.');
 	state int count = FLOW_KNOBS->NETWORK_TEST_REQUEST_COUNT;
-	state latency_stats::sample sample;
+	state LatencyStats::sample sample;
 
-	while (more_requests_pending(*sent)) {
+	while (moreRequestsPending(*sent)) {
 		(*sent)++;
 		sample = latency->tick();
-		NetworkTestReply rep = wait(  retryBrokenPromise(interfs[deterministicRandom()->randomInt(0, interfs.size())].test, NetworkTestRequest( StringRef(request_payload), FLOW_KNOBS->NETWORK_TEST_REPLY_SIZE ) ) );
+		NetworkTestReply rep = wait(
+		    retryBrokenPromise(interfs[deterministicRandom()->randomInt(0, interfs.size())].test,
+		                       NetworkTestRequest(StringRef(request_payload), FLOW_KNOBS->NETWORK_TEST_REPLY_SIZE)));
 		latency->tock(sample);
 		(*completed)++;
 	}
 	return Void();
 }
 
-ACTOR Future<Void> logger( int * sent, int* completed, latency_stats * latency ) {
+ACTOR Future<Void> logger(int* sent, int* completed, LatencyStats* latency) {
 	state double lastTime = now();
 	state int logged = 0;
 	state int iteration = 0;
-	while (more_logging_needed(logged, ++iteration)) {
+	while (moreLoggingNeeded(logged, ++iteration)) {
 		wait( delay(1.0) );
-		auto spd = (*completed-logged) / (now() - lastTime);
+		auto spd = (*completed - logged) / (now() - lastTime);
 		if (FLOW_KNOBS->NETWORK_TEST_SCRIPT_MODE) {
 			if (iteration == 2) {
 				// We don't report the first iteration because of warm-up effects.
-				printf( "%f\t%.3f\t%.3f\n", spd, latency->mean() * 1e6, latency->stddev() * 1e6);
+				printf("%f\t%.3f\t%.3f\n", spd, latency->mean() * 1e6, latency->stddev() * 1e6);
 			}
 		} else {
-			fprintf( stderr, "messages per second: %f (%6.3f us)\n", spd, latency->mean() * 1e6);
+			fprintf(stderr, "messages per second: %f (%6.3f us)\n", spd, latency->mean() * 1e6);
 		}
 		latency->reset();
 		lastTime = now();
@@ -227,17 +219,17 @@ ACTOR Future<Void> networkTestClient( std:: string testServers ) {
 	state std::vector<NetworkAddress> servers = NetworkAddress::parseList(testServers);
 	state int sent = 0;
 	state int completed = 0;
-	state latency_stats latency;
+	state LatencyStats latency;
 
 	for( int i = 0; i < servers.size(); i++ ) {
 		interfs.push_back( NetworkTestInterface( servers[i] ) );
 	}
 
 	state std::vector<Future<Void>> clients;
-	for( int i = 0; i < FLOW_KNOBS->NETWORK_TEST_CLIENT_COUNT; i++ ) {
-		clients.push_back( testClient( interfs, &sent, &completed, &latency ) );
+	for (int i = 0; i < FLOW_KNOBS->NETWORK_TEST_CLIENT_COUNT; i++) {
+		clients.push_back(testClient(interfs, &sent, &completed, &latency));
 	}
-	clients.push_back( logger( &sent, &completed, &latency ) );
+	clients.push_back(logger(&sent, &completed, &latency));
 
 	wait( waitForAll( clients ) );
 	return Void();
diff --git a/flow/Knobs.h b/flow/Knobs.h
index 77f2000241..f6a5436826 100644
--- a/flow/Knobs.h
+++ b/flow/Knobs.h
@@ -101,7 +101,7 @@ public:
 	int NETWORK_TEST_REQUEST_COUNT;
 	int NETWORK_TEST_REQUEST_SIZE;
 	bool NETWORK_TEST_SCRIPT_MODE;
-	
+
 	//AsyncFileCached
 	int64_t PAGE_CACHE_4K;
 	int64_t PAGE_CACHE_64K;