benchmarks: reuse executor between channels

Perhaps by accident, each LoadClient channel got its own executor
pool rather than sharing between all of them.  The benchmarks are
currently set up with C++ idioms, creating 64 "channels" per
client.  C++ uses one thread per channel, while java does not,
leading to a threading model mismatch.

ForkJoinPool is the current executor because it has good contention
characteristics.   However, FJP is also very sensitive to the
number of actively running tasks.  Too many, and the contention
will go up.  Too few, and threads will repeatedly go to sleep and
wake up.

This change basically makes the FJP shared for all clients in the
same process.  It doesn't try at all to shut it down properly or
be generally useful; it's just a benchmark.  Additionally, this
also groups the Netty and OkHttp specific channel options into
helper functions.  This means OkHttp benchmarks will also use
the correct executor now.

A quick test shows QPS going from ~107kqps to 149kqps.

Fixes: #2406
This commit is contained in:
Carl Mastrangelo 2016-11-08 13:50:06 -08:00
parent 809cf1bc8c
commit 944879dbbb
2 changed files with 109 additions and 86 deletions

View File

@ -33,11 +33,11 @@ package io.grpc.benchmarks;
import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.UncaughtExceptionHandlers;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.benchmarks.proto.Messages;
import io.grpc.benchmarks.proto.Messages.Payload;
@ -50,7 +50,6 @@ import io.grpc.netty.NettyChannelBuilder;
import io.grpc.okhttp.OkHttpChannelBuilder;
import io.grpc.okhttp.internal.Platform;
import io.grpc.testing.TestUtils;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
@ -60,6 +59,7 @@ import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.HdrHistogram.Histogram;
@ -70,10 +70,10 @@ import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
@ -136,117 +136,140 @@ public final class Utils {
}
}
/**
* Create a {@link ManagedChannel} for the given parameters.
*/
public static ManagedChannel newClientChannel(Transport transport, SocketAddress address,
boolean tls, boolean testca, @Nullable String authorityOverride, boolean useDefaultCiphers,
int flowControlWindow, boolean directExecutor) throws IOException {
if (transport == Transport.OK_HTTP) {
InetSocketAddress addr = (InetSocketAddress) address;
OkHttpChannelBuilder builder = OkHttpChannelBuilder
.forAddress(addr.getHostName(), addr.getPort());
if (directExecutor) {
builder.directExecutor();
}
builder.negotiationType(tls ? io.grpc.okhttp.NegotiationType.TLS
: io.grpc.okhttp.NegotiationType.PLAINTEXT);
if (tls) {
SSLSocketFactory factory;
if (testca) {
builder.overrideAuthority(
GrpcUtil.authorityFromHostAndPort(authorityOverride, addr.getPort()));
try {
factory = TestUtils.newSslSocketFactoryForCa(
Platform.get().getProvider(),
TestUtils.loadCert("ca.pem"));
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
factory = (SSLSocketFactory) SSLSocketFactory.getDefault();
private static OkHttpChannelBuilder newOkhttpClientChannel(
SocketAddress address, boolean tls, boolean testca, @Nullable String authorityOverride) {
InetSocketAddress addr = (InetSocketAddress) address;
OkHttpChannelBuilder builder =
OkHttpChannelBuilder.forAddress(addr.getHostName(), addr.getPort());
if (tls) {
builder.negotiationType(io.grpc.okhttp.NegotiationType.TLS);
SSLSocketFactory factory;
if (testca) {
builder.overrideAuthority(
GrpcUtil.authorityFromHostAndPort(authorityOverride, addr.getPort()));
try {
factory = TestUtils.newSslSocketFactoryForCa(
Platform.get().getProvider(),
TestUtils.loadCert("ca.pem"));
} catch (Exception e) {
throw new RuntimeException(e);
}
builder.sslSocketFactory(factory);
} else {
factory = (SSLSocketFactory) SSLSocketFactory.getDefault();
}
if (authorityOverride != null) {
builder.overrideAuthority(authorityOverride);
builder.sslSocketFactory(factory);
} else {
builder.negotiationType(io.grpc.okhttp.NegotiationType.PLAINTEXT);
}
return builder;
}
private static NettyChannelBuilder newNettyClientChannel(Transport transport,
SocketAddress address, boolean tls, boolean testca, int flowControlWindow,
boolean useDefaultCiphers) throws IOException {
NettyChannelBuilder builder =
NettyChannelBuilder.forAddress(address).flowControlWindow(flowControlWindow);
if (tls) {
builder.negotiationType(NegotiationType.TLS);
SslContext sslContext = null;
if (testca) {
File cert = TestUtils.loadCert("ca.pem");
SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient().trustManager(cert);
if (transport == Transport.NETTY_NIO) {
sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, SslProvider.JDK);
} else {
// Native transport with OpenSSL
sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, SslProvider.OPENSSL);
}
if (useDefaultCiphers) {
sslContextBuilder.ciphers(null);
}
sslContext = sslContextBuilder.build();
}
return builder.build();
builder.sslContext(sslContext);
} else {
builder.negotiationType(NegotiationType.PLAINTEXT);
}
// It's a Netty transport.
SslContext sslContext = null;
NegotiationType negotiationType = tls ? NegotiationType.TLS : NegotiationType.PLAINTEXT;
if (tls && testca) {
File cert = TestUtils.loadCert("ca.pem");
SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient().trustManager(cert);
if (transport == Transport.NETTY_NIO) {
sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, SslProvider.JDK);
} else {
// Native transport with OpenSSL
sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, SslProvider.OPENSSL);
}
if (useDefaultCiphers) {
sslContextBuilder.ciphers(null);
}
sslContext = sslContextBuilder.build();
}
final EventLoopGroup group;
final Class<? extends io.netty.channel.Channel> channelType;
ThreadFactory tf = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("ELG-%d")
.build();
DefaultThreadFactory tf = new DefaultThreadFactory("client-elg-", true /*daemon */);
switch (transport) {
case NETTY_NIO:
group = new NioEventLoopGroup(0, tf);
channelType = NioSocketChannel.class;
builder
.eventLoopGroup(new NioEventLoopGroup(0, tf))
.channelType(NioSocketChannel.class);
break;
case NETTY_EPOLL:
// These classes only work on Linux.
group = new EpollEventLoopGroup(0, tf);
channelType = EpollSocketChannel.class;
builder
.eventLoopGroup(new EpollEventLoopGroup(0, tf))
.channelType(EpollSocketChannel.class);
break;
case NETTY_UNIX_DOMAIN_SOCKET:
// These classes only work on Linux.
group = new EpollEventLoopGroup(0, tf);
channelType = EpollDomainSocketChannel.class;
builder
.eventLoopGroup(new EpollEventLoopGroup(0, tf))
.channelType(EpollDomainSocketChannel.class);
break;
default:
// Should never get here.
throw new IllegalArgumentException("Unsupported transport: " + transport);
}
NettyChannelBuilder builder = NettyChannelBuilder
.forAddress(address)
.eventLoopGroup(group)
.channelType(channelType)
.negotiationType(negotiationType)
.sslContext(sslContext)
.flowControlWindow(flowControlWindow);
if (authorityOverride != null) {
builder.overrideAuthority(authorityOverride);
}
if (directExecutor) {
builder.directExecutor();
} else {
// TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be
// put. Move it somewhere else, or remove it if no longer necessary.
// See: https://github.com/grpc/grpc-java/issues/2119
builder.executor(new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
return builder;
}
private static ExecutorService clientExecutor;
private static synchronized ExecutorService getExecutor() {
if (clientExecutor == null) {
clientExecutor = new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
new ForkJoinWorkerThreadFactory() {
final AtomicInteger num = new AtomicInteger();
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setDaemon(true);
thread.setName("grpc-server-app-" + "-" + num.getAndIncrement());
thread.setName("grpc-client-app-" + "-" + num.getAndIncrement());
return thread;
}
}, UncaughtExceptionHandlers.systemExit(), true /* async */));
}, UncaughtExceptionHandlers.systemExit(), true /* async */);
}
return clientExecutor;
}
/**
* Create a {@link ManagedChannel} for the given parameters.
*/
public static ManagedChannel newClientChannel(Transport transport, SocketAddress address,
boolean tls, boolean testca, @Nullable String authorityOverride, boolean useDefaultCiphers,
int flowControlWindow, boolean directExecutor) {
ManagedChannelBuilder<?> builder;
if (transport == Transport.OK_HTTP) {
builder = newOkhttpClientChannel(address, tls, testca, authorityOverride);
} else {
try {
builder = newNettyClientChannel(
transport, address, tls, testca, flowControlWindow, useDefaultCiphers);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
if (authorityOverride != null) {
builder.overrideAuthority(authorityOverride);
}
if (directExecutor) {
builder.directExecutor();
} else {
// TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be
// put. Move it somewhere else, or remove it if no longer necessary.
// See: https://github.com/grpc/grpc-java/issues/2119
builder.executor(getExecutor());
}
return builder.build();
}

View File

@ -60,7 +60,7 @@ import io.netty.buffer.PooledByteBufAllocator;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
import java.util.concurrent.ForkJoinWorkerThread;
@ -165,7 +165,7 @@ final class LoadServer {
}
}
Executor getExecutor(int asyncThreads) {
ExecutorService getExecutor(int asyncThreads) {
// TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be
// put. Move it somewhere else, or remove it if no longer necessary.
// See: https://github.com/grpc/grpc-java/issues/2119