netty: Rely on ChannelFactory in NettyServer instead of dynamic classes

Fixes #5649
This commit is contained in:
Anar Sultanov 2019-09-18 00:59:29 +02:00 committed by Eric Anderson
parent fd4f189d2b
commit ba0fd84d79
6 changed files with 70 additions and 31 deletions

View File

@ -38,6 +38,7 @@ import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.TransportTracer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
@ -66,7 +67,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
private final InternalLogId logId;
private final SocketAddress address;
private final Class<? extends ServerChannel> channelType;
private final ChannelFactory<? extends ServerChannel> channelFactory;
private final Map<ChannelOption<?>, ?> channelOptions;
private final ProtocolNegotiator protocolNegotiator;
private final int maxStreamsPerConnection;
@ -95,7 +96,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
new AtomicReference<>();
NettyServer(
SocketAddress address, Class<? extends ServerChannel> channelType,
SocketAddress address, ChannelFactory<? extends ServerChannel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions,
ObjectPool<? extends EventLoopGroup> bossGroupPool,
ObjectPool<? extends EventLoopGroup> workerGroupPool,
@ -109,7 +110,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
InternalChannelz channelz) {
this.address = address;
this.channelType = checkNotNull(channelType, "channelType");
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
checkNotNull(channelOptions, "channelOptions");
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
@ -155,7 +156,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(channelType);
b.channelFactory(channelFactory);
// For non-socket based channel, the option will be ignored.
b.option(SO_BACKLOG, 128);
b.childOption(SO_KEEPALIVE, true);
@ -170,7 +171,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
b.childHandler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
public void initChannel(Channel ch) {
ChannelPromise channelDone = ch.newPromise();
@ -217,7 +218,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
* Releases the event loop if the channel is "done", possibly due to the channel closing.
*/
final class LoopReleaser implements ChannelFutureListener {
boolean done;
private boolean done;
@Override
public void operationComplete(ChannelFuture future) throws Exception {

View File

@ -25,7 +25,6 @@ import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.ExperimentalApi;
import io.grpc.Internal;
@ -36,8 +35,10 @@ import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ReflectiveChannelFactory;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
@ -79,7 +80,9 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
private final List<SocketAddress> listenAddresses = new ArrayList<>();
private Class<? extends ServerChannel> channelType = null;
private ChannelFactory<? extends ServerChannel> channelFactory =
Utils.DEFAULT_SERVER_CHANNEL_FACTORY;
private final Map<ChannelOption<?>, Object> channelOptions = new HashMap<>();
private ObjectPool<? extends EventLoopGroup> bossEventLoopGroupPool =
DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL;
@ -91,7 +94,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
private long keepAliveTimeInNanos = DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
private long keepAliveTimeInNanos = DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
private long keepAliveTimeoutInNanos = DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
private long maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED;
private long maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED;
@ -142,8 +145,14 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
}
/**
* Specify the channel type to use, by default we use {@link NioServerSocketChannel} or {@code
* EpollServerSocketChannel}.
* Specifies the channel type to use, by default we use {@code EpollServerSocketChannel} if
* available, otherwise using {@link NioServerSocketChannel}.
*
* <p>You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
* {@link ServerChannel} implementation has no no-args constructor.
*
* <p>It's an optional parameter. If the user has not provided an Channel type or ChannelFactory
* when the channel is built, the builder will use the default one which is static.
*
* <p>You must also provide corresponding {@link EventLoopGroup} using {@link
* #workerEventLoopGroup(EventLoopGroup)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For
@ -151,7 +160,26 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
* io.netty.channel.nio.NioEventLoopGroup}, otherwise your server won't start.
*/
public NettyServerBuilder channelType(Class<? extends ServerChannel> channelType) {
this.channelType = Preconditions.checkNotNull(channelType, "channelType");
checkNotNull(channelType, "channelType");
return channelFactory(new ReflectiveChannelFactory<>(channelType));
}
/**
* Specifies the {@link ChannelFactory} to create {@link ServerChannel} instances. This method is
* usually only used if the specific {@code ServerChannel} requires complex logic which requires
* additional information to create the {@code ServerChannel}. Otherwise, recommend to use {@link
* #channelType(Class)}.
*
* <p>It's an optional parameter. If the user has not provided an Channel type or ChannelFactory
* when the channel is built, the builder will use the default one which is static.
*
* <p>You must also provide corresponding {@link EventLoopGroup} using {@link
* #workerEventLoopGroup(EventLoopGroup)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For
* example, if the factory creates {@link NioServerSocketChannel} you must use {@link
* io.netty.channel.nio.NioEventLoopGroup}, otherwise your server won't start.
*/
public NettyServerBuilder channelFactory(ChannelFactory<? extends ServerChannel> channelFactory) {
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
return this;
}
@ -499,16 +527,13 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
ProtocolNegotiator negotiator = protocolNegotiator;
if (negotiator == null) {
negotiator = sslContext != null ? ProtocolNegotiators.serverTls(sslContext) :
ProtocolNegotiators.serverPlaintext();
ProtocolNegotiators.serverPlaintext();
}
Class<? extends ServerChannel> resolvedChannelType =
channelType == null ? Utils.DEFAULT_SERVER_CHANNEL_TYPE : channelType;
List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());
for (SocketAddress listenAddress : listenAddresses) {
NettyServer transportServer = new NettyServer(
listenAddress, resolvedChannelType, channelOptions, bossEventLoopGroupPool,
listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool,
workerEventLoopGroupPool, negotiator, streamTracerFactories,
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
@ -521,10 +546,10 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
@VisibleForTesting
void assertEventLoopsAndChannelType() {
boolean allProvided = channelType != null
boolean allProvided = channelFactory != Utils.DEFAULT_SERVER_CHANNEL_FACTORY
&& bossEventLoopGroupPool != DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL
&& workerEventLoopGroupPool != DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
boolean nonProvided = channelType == null
boolean nonProvided = channelFactory == Utils.DEFAULT_SERVER_CHANNEL_FACTORY
&& bossEventLoopGroupPool == DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL
&& workerEventLoopGroupPool == DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
checkState(

View File

@ -36,8 +36,10 @@ import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2InboundHeaders;
import io.grpc.netty.NettySocketSupport.NativeSocketOptions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ReflectiveChannelFactory;
import io.netty.channel.ServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@ -81,7 +83,7 @@ class Utils {
public static final Resource<EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP;
public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP;
public static final Class<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_TYPE;
public static final ChannelFactory<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_FACTORY;
public static final Class<? extends Channel> DEFAULT_CLIENT_CHANNEL_TYPE;
@Nullable
@ -90,8 +92,8 @@ class Utils {
static {
// Decide default channel types and EventLoopGroup based on Epoll availability
if (isEpollAvailable()) {
DEFAULT_SERVER_CHANNEL_TYPE = epollServerChannelType();
DEFAULT_CLIENT_CHANNEL_TYPE = epollChannelType();
DEFAULT_SERVER_CHANNEL_FACTORY = new ReflectiveChannelFactory<>(epollServerChannelType());
EPOLL_EVENT_LOOP_GROUP_CONSTRUCTOR = epollEventLoopGroupConstructor();
DEFAULT_BOSS_EVENT_LOOP_GROUP
= new DefaultEventLoopGroupResource(1, "grpc-default-boss-ELG", EventLoopGroupType.EPOLL);
@ -99,7 +101,7 @@ class Utils {
= new DefaultEventLoopGroupResource(0,"grpc-default-worker-ELG", EventLoopGroupType.EPOLL);
} else {
logger.log(Level.FINE, "Epoll is not available, using Nio.", getEpollUnavailabilityCause());
DEFAULT_SERVER_CHANNEL_TYPE = NioServerSocketChannel.class;
DEFAULT_SERVER_CHANNEL_FACTORY = nioServerChannelFactory();
DEFAULT_CLIENT_CHANNEL_TYPE = NioSocketChannel.class;
DEFAULT_BOSS_EVENT_LOOP_GROUP = NIO_BOSS_EVENT_LOOP_GROUP;
DEFAULT_WORKER_EVENT_LOOP_GROUP = NIO_WORKER_EVENT_LOOP_GROUP;
@ -290,6 +292,15 @@ class Utils {
}
}
private static ChannelFactory<ServerChannel> nioServerChannelFactory() {
return new ChannelFactory<ServerChannel>() {
@Override
public ServerChannel newChannel() {
return new NioServerSocketChannel();
}
};
}
/**
* Returns TCP_USER_TIMEOUT channel option for Epoll channel if Epoll is available, otherwise
* null.

View File

@ -719,7 +719,7 @@ public class NettyClientTransportTest {
private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException {
server = new NettyServer(
TestUtils.testServerAddress(new InetSocketAddress(0)),
NioServerSocketChannel.class,
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),

View File

@ -58,7 +58,7 @@ public class NettyServerTest {
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
addr,
Utils.DEFAULT_SERVER_CHANNEL_TYPE,
Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
new HashMap<ChannelOption<?>, Object>(),
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
@ -96,7 +96,7 @@ public class NettyServerTest {
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
addr,
Utils.DEFAULT_SERVER_CHANNEL_TYPE,
Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
new HashMap<ChannelOption<?>, Object>(),
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
@ -134,7 +134,7 @@ public class NettyServerTest {
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
addr,
Utils.DEFAULT_SERVER_CHANNEL_TYPE,
Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
channelOptions,
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
@ -184,7 +184,7 @@ public class NettyServerTest {
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
addr,
Utils.DEFAULT_SERVER_CHANNEL_TYPE,
Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
new HashMap<ChannelOption<?>, Object>(),
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),

View File

@ -29,9 +29,11 @@ import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
@ -204,13 +206,13 @@ public class UtilsTest {
}
@Test
public void defaultServerChannelType_whenEpollIsAvailable() {
public void defaultServerChannelFactory_whenEpollIsAvailable() {
assume().that(Utils.isEpollAvailable()).isTrue();
Class<? extends Channel> clientChannelType = Utils.DEFAULT_SERVER_CHANNEL_TYPE;
ChannelFactory<? extends ServerChannel> channelFactory = Utils.DEFAULT_SERVER_CHANNEL_FACTORY;
assertThat(clientChannelType.getName())
.isEqualTo("io.netty.channel.epoll.EpollServerSocketChannel");
assertThat(channelFactory.toString())
.isEqualTo("ReflectiveChannelFactory(EpollServerSocketChannel.class)");
}
@Test