benchmarks: Add byte throughput benchmark to TransportBenchmark

This is equivalent to UnaryCallResponseBandwidthBenchmark and
StreamingResponseBandwidthBenchmark, although without the interface selection
logic (which allows for traffic shaping).
This commit is contained in:
Eric Anderson 2018-02-17 12:01:46 -08:00
parent 0aaaacec83
commit 8d18dbd501
4 changed files with 132 additions and 251 deletions

View File

@ -0,0 +1,45 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.benchmarks;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
final class ThreadlessExecutor implements Executor {
private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
/**
* Waits until there is a Runnable, then executes it and all queued Runnables after it.
*/
public void waitAndDrain() throws InterruptedException {
Runnable runnable = queue.take();
while (runnable != null) {
try {
runnable.run();
} catch (Throwable t) {
throw new RuntimeException("Runnable threw exception", t);
}
runnable = queue.poll();
}
}
@Override
public void execute(Runnable runnable) {
queue.add(runnable);
}
}

View File

@ -34,6 +34,7 @@ import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.okhttp.OkHttpChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.netty.channel.Channel;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
@ -47,12 +48,14 @@ import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
/** Some text. */
@State(Scope.Benchmark)
@ -69,6 +72,7 @@ public class TransportBenchmark {
private ManagedChannel channel;
private Server server;
private BenchmarkServiceGrpc.BenchmarkServiceBlockingStub stub;
private BenchmarkServiceGrpc.BenchmarkServiceStub asyncStub;
private EventLoopGroup groupToShutdown;
@Setup
@ -157,6 +161,7 @@ public class TransportBenchmark {
server.start();
channel = channelBuilder.build();
stub = BenchmarkServiceGrpc.newBlockingStub(channel);
asyncStub = BenchmarkServiceGrpc.newStub(channel);
// Wait for channel to start
stub.unaryCall(SimpleRequest.getDefaultInstance());
}
@ -182,7 +187,7 @@ public class TransportBenchmark {
}
}
private SimpleRequest simpleRequest = SimpleRequest.newBuilder()
private static final SimpleRequest UNARY_CALL_1024_REQUEST = SimpleRequest.newBuilder()
.setResponseSize(1024)
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[1024])))
.build();
@ -190,7 +195,86 @@ public class TransportBenchmark {
@Benchmark
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public SimpleResponse unaryCall1024() {
return stub.unaryCall(simpleRequest);
public SimpleResponse unaryCall1024Latency() {
return stub.unaryCall(UNARY_CALL_1024_REQUEST);
}
private static final int BYTE_THROUGHPUT_RESPONSE_SIZE = 1048576;
private static final SimpleRequest BYTE_THROUGHPUT_REQUEST = SimpleRequest.newBuilder()
.setResponseSize(BYTE_THROUGHPUT_RESPONSE_SIZE)
.build();
@Benchmark
@BenchmarkMode(Mode.Throughput)
@OperationsPerInvocation(BYTE_THROUGHPUT_RESPONSE_SIZE)
@Threads(10)
public SimpleResponse unaryCallsByteThroughput() {
return stub.unaryCall(BYTE_THROUGHPUT_REQUEST);
}
private static final Throwable OK_THROWABLE = new RuntimeException("OK");
@State(Scope.Thread)
public static class PingPongStreamState {
private final ThreadlessExecutor executor = new ThreadlessExecutor();
private StreamObserver<SimpleRequest> requestObserver;
private SimpleResponse response;
private Throwable status;
@Setup
public void setUp(TransportBenchmark bench) {
requestObserver = bench.asyncStub
.withExecutor(executor)
.streamingCall(new StreamObserver<SimpleResponse>() {
@Override public void onNext(SimpleResponse next) {
assert response == null;
response = next;
}
@Override public void onError(Throwable t) {
status = t;
}
@Override public void onCompleted() {
status = OK_THROWABLE;
}
});
}
/** Issues request and waits for response. */
public SimpleResponse pingPong(SimpleRequest request) throws InterruptedException {
requestObserver.onNext(request);
while (true) {
executor.waitAndDrain();
if (response != null) {
SimpleResponse savedResponse = response;
response = null;
return savedResponse;
}
if (status != null) {
throw new RuntimeException("Unexpected stream termination", status);
}
}
}
@TearDown
public void tearDown() throws InterruptedException {
requestObserver.onCompleted();
while (status == null) {
executor.waitAndDrain();
}
if (status != OK_THROWABLE) {
throw new RuntimeException("Non-graceful stream shutdown", status);
}
}
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
@OperationsPerInvocation(BYTE_THROUGHPUT_RESPONSE_SIZE)
@Threads(10)
public SimpleResponse streamingCallsByteThroughput(PingPongStreamState state)
throws InterruptedException {
return state.pingPong(BYTE_THROUGHPUT_REQUEST);
}
}

View File

@ -1,128 +0,0 @@
/*
* Copyright 2015 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.benchmarks.netty;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
/**
* Benchmark intended to test response bandwidth in bytes/sec for streaming calls by permuting
* payload size and flow-control windows with number of concurrent calls. Async stubs are used
* to avoid context-switching overheads.
*/
@State(Scope.Benchmark)
@Fork(1)
public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark {
@Param({"1", "10"})
public int maxConcurrentStreams = 1;
@Param({"LARGE", "JUMBO"})
public MessageSize responseSize = MessageSize.JUMBO;
@Param({"MEDIUM", "LARGE", "JUMBO"})
public FlowWindowSize clientInboundFlowWindow = FlowWindowSize.MEDIUM;
private static AtomicLong callCounter;
private AtomicBoolean completed;
private AtomicBoolean record;
private CountDownLatch latch;
/**
* Use an AuxCounter so we can measure that calls as they occur without consuming CPU
* in the benchmark method.
*/
@AuxCounters
@State(Scope.Thread)
public static class AdditionalCounters {
@Setup(Level.Iteration)
public void clean() {
callCounter.set(0);
}
public long megabitsPerSecond() {
return (callCounter.get() * 8) >> 20;
}
}
/**
* Setup with direct executors and one channel.
*/
@Setup(Level.Trial)
public void setup() throws Exception {
super.setup(ExecutorType.DIRECT,
ExecutorType.DIRECT,
MessageSize.SMALL,
responseSize,
clientInboundFlowWindow,
ChannelType.NIO,
maxConcurrentStreams,
1);
callCounter = new AtomicLong();
completed = new AtomicBoolean();
record = new AtomicBoolean();
latch = startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, record, completed,
responseSize.bytes());
}
/**
* Stop the running calls then stop the server and client channels.
*/
@Override
@TearDown(Level.Trial)
public void teardown() throws Exception {
completed.set(true);
if (!latch.await(5, TimeUnit.SECONDS)) {
System.err.println("Failed to shutdown all calls.");
}
super.teardown();
}
/**
* Measure bandwidth of streamed responses.
*/
@Benchmark
public void stream(AdditionalCounters counters) throws Exception {
record.set(true);
// No need to do anything, just sleep here.
Thread.sleep(1001);
record.set(false);
}
/**
* Useful for triggering a subset of the benchmark in a profiler.
*/
public static void main(String[] argv) throws Exception {
StreamingResponseBandwidthBenchmark bench = new StreamingResponseBandwidthBenchmark();
bench.setup();
Thread.sleep(30000);
bench.teardown();
System.exit(0);
}
}

View File

@ -1,120 +0,0 @@
/*
* Copyright 2015 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.benchmarks.netty;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
/**
* Benchmark intended to test response bandwidth in bytes/sec for unary calls by permuting
* payload size and flow-control windows with number of concurrent calls. Async stubs are used
* to avoid context-switching overheads.
*/
@State(Scope.Benchmark)
@Fork(1)
public class UnaryCallResponseBandwidthBenchmark extends AbstractBenchmark {
@Param({"1", "10"})
public int maxConcurrentStreams = 1;
@Param({"LARGE", "JUMBO"})
public MessageSize responseSize = MessageSize.JUMBO;
@Param({"MEDIUM", "LARGE", "JUMBO"})
public FlowWindowSize clientInboundFlowWindow = FlowWindowSize.MEDIUM;
private static AtomicLong callCounter;
private AtomicBoolean completed;
/**
* Use an AuxCounter so we can measure that calls as they occur without consuming CPU
* in the benchmark method.
*/
@AuxCounters
@State(Scope.Thread)
public static class AdditionalCounters {
@Setup(Level.Iteration)
public void clean() {
callCounter.set(0);
}
public long megabitsPerSecond() {
// Convert bytes into megabits
return (callCounter.get() * 8) >> 20;
}
}
/**
* Setup with direct executors, small payloads and a large flow control window.
*/
@Setup(Level.Trial)
public void setup() throws Exception {
super.setup(ExecutorType.DIRECT,
ExecutorType.DIRECT,
MessageSize.SMALL,
responseSize,
clientInboundFlowWindow,
ChannelType.NIO,
maxConcurrentStreams,
1);
callCounter = new AtomicLong();
completed = new AtomicBoolean();
startUnaryCalls(maxConcurrentStreams, callCounter, completed, responseSize.bytes());
}
/**
* Stop the running calls then stop the server and client channels.
*/
@Override
@TearDown(Level.Trial)
public void teardown() throws Exception {
completed.set(true);
Thread.sleep(5000);
super.teardown();
}
/**
* Measure bandwidth of unary call responses. The calls are already running, we just observe a
* counter of received responses.
*/
@Benchmark
public void unary(AdditionalCounters counters) throws Exception {
// No need to do anything, just sleep here.
Thread.sleep(1001);
}
/**
* Useful for triggering a subset of the benchmark in a profiler.
*/
public static void main(String[] argv) throws Exception {
UnaryCallResponseBandwidthBenchmark bench = new UnaryCallResponseBandwidthBenchmark();
bench.setup();
Thread.sleep(30000);
bench.teardown();
System.exit(0);
}
}