benchmarks: Move message throughput benchmark to TransportBenchmark

This replaces FlowControlledMessagesPerSecondBenchmark, except it does not
avoid local flow control issues via request(5). If hacking in a request(5),
this benchmark produces similar results (non-direct: 671k vs previously 641k
msg/s).
This commit is contained in:
Eric Anderson 2018-02-17 22:51:06 -08:00
parent 8d18dbd501
commit daed6e01b1
3 changed files with 86 additions and 136 deletions

View File

@ -0,0 +1,43 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.benchmarks;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;
/** Interceptor that lets you cancel the most recent call made. This class is not thread-safe. */
class CancellableInterceptor implements ClientInterceptor {
private ClientCall<?, ?> call;
@Override
public <ReqT,RespT> ClientCall<ReqT,RespT> interceptCall(
MethodDescriptor<ReqT,RespT> method, CallOptions callOptions, Channel next) {
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
this.call = call;
return call;
}
public void cancel(String message, Throwable cause) {
if (call == null) {
throw new NullPointerException("No previous call");
}
call.cancel(message, cause);
}
}

View File

@ -21,6 +21,8 @@ import static io.grpc.benchmarks.Utils.pickUnusedPort;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.Server; import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.benchmarks.proto.BenchmarkServiceGrpc; import io.grpc.benchmarks.proto.BenchmarkServiceGrpc;
import io.grpc.benchmarks.proto.Messages.Payload; import io.grpc.benchmarks.proto.Messages.Payload;
import io.grpc.benchmarks.proto.Messages.SimpleRequest; import io.grpc.benchmarks.proto.Messages.SimpleRequest;
@ -43,6 +45,7 @@ import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel; import io.netty.channel.local.LocalServerChannel;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Benchmark;
@ -277,4 +280,44 @@ public class TransportBenchmark {
throws InterruptedException { throws InterruptedException {
return state.pingPong(BYTE_THROUGHPUT_REQUEST); return state.pingPong(BYTE_THROUGHPUT_REQUEST);
} }
@State(Scope.Thread)
public static class InfiniteStreamState {
private final CancellableInterceptor cancellableInterceptor = new CancellableInterceptor();
private Iterator<SimpleResponse> iter;
@Setup
public void setUp(TransportBenchmark bench) {
iter = bench.stub
.withInterceptors(cancellableInterceptor)
.streamingFromServer(SimpleRequest.getDefaultInstance());
}
public SimpleResponse recv() throws InterruptedException {
return iter.next();
}
@TearDown
public void tearDown() throws InterruptedException {
cancellableInterceptor.cancel("Normal tear-down", null);
try {
// Need to drain the queue
while (iter.hasNext()) {
iter.next();
}
} catch (StatusRuntimeException ex) {
if (!Status.Code.CANCELLED.equals(ex.getStatus().getCode())) {
throw ex;
}
}
}
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
@Threads(10)
public SimpleResponse streamingCallsMessageThroughput(InfiniteStreamState state)
throws InterruptedException {
return state.recv();
}
} }

View File

@ -1,136 +0,0 @@
/*
* Copyright 2015 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.benchmarks.netty;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
/**
* Benchmark measuring messages per second received from a streaming server. The server
* is obeying outbound flow-control.
*/
@State(Scope.Benchmark)
@Fork(1)
public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark {
private static final Logger logger =
Logger.getLogger(FlowControlledMessagesPerSecondBenchmark.class.getName());
@Param({"1", "2", "4"})
public int channelCount = 1;
@Param({"1", "2", "10", "100"})
public int maxConcurrentStreams = 1;
@Param
public ExecutorType clientExecutor = ExecutorType.DIRECT;
@Param({"SMALL"})
public MessageSize responseSize = MessageSize.SMALL;
private static AtomicLong callCounter;
private AtomicBoolean completed;
private AtomicBoolean record;
private CountDownLatch latch;
/**
* Use an AuxCounter so we can measure that calls as they occur without consuming CPU
* in the benchmark method.
*/
@AuxCounters
@State(Scope.Thread)
public static class AdditionalCounters {
@Setup(Level.Iteration)
public void clean() {
callCounter.set(0);
}
public long messagesPerSecond() {
return callCounter.get();
}
}
/**
* Setup with direct executors, small payloads and the default flow-control window.
*/
@Setup(Level.Trial)
public void setup() throws Exception {
super.setup(clientExecutor,
ExecutorType.DIRECT,
MessageSize.SMALL,
responseSize,
FlowWindowSize.MEDIUM,
ChannelType.NIO,
maxConcurrentStreams,
channelCount);
callCounter = new AtomicLong();
completed = new AtomicBoolean();
record = new AtomicBoolean();
latch =
startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, record, completed, 1);
}
/**
* Stop the running calls then stop the server and client channels.
*/
@Override
@TearDown(Level.Trial)
public void teardown() throws Exception {
completed.set(true);
if (!latch.await(5, TimeUnit.SECONDS)) {
logger.warning("Failed to shutdown all calls.");
}
super.teardown();
}
/**
* Measure the rate of messages received. The calls are already running, we just observe a counter
* of received responses.
*/
@Benchmark
public void stream(AdditionalCounters counters) throws Exception {
record.set(true);
// No need to do anything, just sleep here.
Thread.sleep(1001);
record.set(false);
}
/**
* Useful for triggering a subset of the benchmark in a profiler.
*/
public static void main(String[] argv) throws Exception {
FlowControlledMessagesPerSecondBenchmark bench = new FlowControlledMessagesPerSecondBenchmark();
bench.setup();
Thread.sleep(30000);
bench.teardown();
System.exit(0);
}
}