netty: Netty{Server,Channel}Builder requires all or none of ELG and ChannelType (#6014)

This commit is contained in:
Jihun Cho 2019-07-26 09:25:02 -07:00 committed by GitHub
parent 4114433674
commit 65109e6738
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 86 additions and 114 deletions

View File

@ -27,6 +27,7 @@ import io.grpc.benchmarks.proto.WorkerServiceGrpc;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -50,6 +51,7 @@ public class LoadWorker {
.build());
this.driverServer = NettyServerBuilder.forPort(driverPort)
.directExecutor()
.channelType(NioServerSocketChannel.class)
.workerEventLoopGroup(singleThreadGroup)
.bossEventLoopGroup(singleThreadGroup)
.addService(new WorkerServiceImpl())

View File

@ -21,9 +21,11 @@ import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import org.junit.After;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@ -33,13 +35,17 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class Http2NettyLocalChannelTest extends AbstractInteropTest {
private DefaultEventLoopGroup eventLoopGroup = new DefaultEventLoopGroup();
@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder
.forAddress(new LocalAddress("in-process-1"))
.flowControlWindow(65 * 1024)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.channelType(LocalServerChannel.class);
.channelType(LocalServerChannel.class)
.workerEventLoopGroup(eventLoopGroup)
.bossEventLoopGroup(eventLoopGroup);
}
@Override
@ -48,10 +54,19 @@ public class Http2NettyLocalChannelTest extends AbstractInteropTest {
.forAddress(new LocalAddress("in-process-1"))
.negotiationType(NegotiationType.PLAINTEXT)
.channelType(LocalChannel.class)
.eventLoopGroup(eventLoopGroup)
.flowControlWindow(65 * 1024)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
io.grpc.internal.TestingAccessor.setStatsImplementation(
builder, createClientCensusStatsModule());
return builder.build();
}
@Override
@After
@SuppressWarnings("FutureReturnValueIgnored")
public void tearDown() {
super.tearDown();
eventLoopGroup.shutdownGracefully();
}
}

View File

@ -53,8 +53,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
@ -66,7 +64,6 @@ import javax.net.ssl.SSLException;
@CanIgnoreReturnValue
public final class NettyChannelBuilder
extends AbstractManagedChannelImplBuilder<NettyChannelBuilder> {
private static final Logger logger = Logger.getLogger(NettyChannelBuilder.class.getName());
public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB
@ -397,6 +394,8 @@ public final class NettyChannelBuilder
@CheckReturnValue
@Internal
protected ClientTransportFactory buildTransportFactory() {
assertEventLoopAndChannelType();
ProtocolNegotiator negotiator;
if (protocolNegotiatorFactory != null) {
negotiator = protocolNegotiatorFactory.buildProtocolNegotiator();
@ -412,44 +411,22 @@ public final class NettyChannelBuilder
negotiator = createProtocolNegotiatorByType(negotiationType, localSslContext);
}
// TODO(jihuncho) throw exception if not groupOrChannelProvided after 1.22.0
ObjectPool<? extends EventLoopGroup> resolvedEventLoopGroupPool = eventLoopGroupPool;
ChannelFactory<? extends Channel> resolvedChannelFactory = channelFactory;
if (shouldFallBackToNio()) {
logger.log(
Level.WARNING,
"Both EventLoopGroup and ChannelType should be provided or neither should be, "
+ "otherwise client may not start. Not provided values will use Nio "
+ "(NioSocketChannel, NioEventLoopGroup) for compatibility. This will cause an "
+ "Exception in the future.");
if (eventLoopGroupPool == DEFAULT_EVENT_LOOP_GROUP_POOL) {
resolvedEventLoopGroupPool =
SharedResourcePool.forResource(Utils.NIO_WORKER_EVENT_LOOP_GROUP);
logger.log(Level.FINE, "Channel type or ChannelFactory is provided, but EventLoopGroup is "
+ "missing. Fall back to NioEventLoopGroup.");
}
if (channelFactory == DEFAULT_CHANNEL_FACTORY) {
resolvedChannelFactory = new ReflectiveChannelFactory<>(NioSocketChannel.class);
logger.log(
Level.FINE, "EventLoopGroup is provided, but Channel type or ChannelFactory is missing."
+ " Fall back to NioSocketChannel.");
}
}
return new NettyTransportFactory(
negotiator, resolvedChannelFactory, channelOptions,
resolvedEventLoopGroupPool, flowControlWindow, maxInboundMessageSize(),
negotiator, channelFactory, channelOptions,
eventLoopGroupPool, flowControlWindow, maxInboundMessageSize(),
maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
transportTracerFactory, localSocketPicker);
}
@VisibleForTesting
boolean shouldFallBackToNio() {
return (channelFactory != DEFAULT_CHANNEL_FACTORY
&& eventLoopGroupPool == DEFAULT_EVENT_LOOP_GROUP_POOL)
|| (channelFactory == DEFAULT_CHANNEL_FACTORY
&& eventLoopGroupPool != DEFAULT_EVENT_LOOP_GROUP_POOL);
void assertEventLoopAndChannelType() {
boolean bothProvided = channelFactory != DEFAULT_CHANNEL_FACTORY
&& eventLoopGroupPool != DEFAULT_EVENT_LOOP_GROUP_POOL;
boolean nonProvided = channelFactory == DEFAULT_CHANNEL_FACTORY
&& eventLoopGroupPool == DEFAULT_EVENT_LOOP_GROUP_POOL;
checkState(
bothProvided || nonProvided,
"Both EventLoopGroup and ChannelType should be provided or neither should be");
}
@Override

View File

@ -18,6 +18,7 @@ package io.grpc.netty;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
@ -50,8 +51,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
@ -62,7 +61,6 @@ import javax.net.ssl.SSLException;
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1784")
@CanIgnoreReturnValue
public final class NettyServerBuilder extends AbstractServerImplBuilder<NettyServerBuilder> {
private static final Logger logger = Logger.getLogger(NettyServerBuilder.class.getName());
public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB
@ -496,50 +494,22 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
@CheckReturnValue
protected List<NettyServer> buildTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
assertEventLoopsAndChannelType();
ProtocolNegotiator negotiator = protocolNegotiator;
if (negotiator == null) {
negotiator = sslContext != null ? ProtocolNegotiators.serverTls(sslContext) :
ProtocolNegotiators.serverPlaintext();
}
Class<? extends ServerChannel> resolvedChannelType = channelType;
ObjectPool<? extends EventLoopGroup> resolvedBossGroupPool = bossEventLoopGroupPool;
ObjectPool<? extends EventLoopGroup> resolvedWorkerGroupPool = workerEventLoopGroupPool;
if (shouldFallBackToNio()) {
// TODO(jihuncho) throw exception if not groupOrChannelProvided after 1.22.0
// Use NIO based channel type and eventloop group for backward compatibility reason
logger.log(
Level.WARNING,
"All of BossEventLoopGroup, WorkerEventLoopGroup and ChannelType should be provided or "
+ "neither should be, otherwise server may not start. Missing values will use Nio "
+ "(NioServerSocketChannel, NioEventLoopGroup) for backward compatibility. "
+ "This will cause an Exception in the future.");
if (channelType == null) {
resolvedChannelType = NioServerSocketChannel.class;
logger.log(Level.FINE, "One or more EventLoopGroup is provided, but Channel type is "
+ "missing. Fall back to NioServerSocketChannel.");
}
if (bossEventLoopGroupPool == DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL) {
resolvedBossGroupPool = SharedResourcePool.forResource(Utils.NIO_BOSS_EVENT_LOOP_GROUP);
logger.log(Level.FINE, "Channel type and/or WorkerEventLoopGroup is provided, but "
+ "BossEventLoopGroup is missing. Fall back to NioEventLoopGroup.");
}
if (workerEventLoopGroupPool == DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL) {
resolvedWorkerGroupPool = SharedResourcePool.forResource(Utils.NIO_WORKER_EVENT_LOOP_GROUP);
logger.log(Level.FINE, "Channel type and/or BossEventLoopGroup is provided, but "
+ "BossEventLoopGroup is missing. Fall back to NioEventLoopGroup.");
}
}
if (resolvedChannelType == null) {
resolvedChannelType = Utils.DEFAULT_SERVER_CHANNEL_TYPE;
}
Class<? extends ServerChannel> resolvedChannelType =
channelType == null ? Utils.DEFAULT_SERVER_CHANNEL_TYPE : channelType;
List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());
for (SocketAddress listenAddress : listenAddresses) {
NettyServer transportServer = new NettyServer(
listenAddress, resolvedChannelType, channelOptions, resolvedBossGroupPool,
resolvedWorkerGroupPool, negotiator, streamTracerFactories,
listenAddress, resolvedChannelType, channelOptions, bossEventLoopGroupPool,
workerEventLoopGroupPool, negotiator, streamTracerFactories,
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
@ -550,14 +520,17 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
}
@VisibleForTesting
boolean shouldFallBackToNio() {
boolean hasNonDefault = channelType != null
|| bossEventLoopGroupPool != DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL
|| workerEventLoopGroupPool != DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
boolean hasDefault = channelType == null
|| bossEventLoopGroupPool == DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL
|| workerEventLoopGroupPool == DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
return hasNonDefault && hasDefault;
void assertEventLoopsAndChannelType() {
boolean allProvided = channelType != null
&& bossEventLoopGroupPool != DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL
&& workerEventLoopGroupPool != DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
boolean nonProvided = channelType == null
&& bossEventLoopGroupPool == DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL
&& workerEventLoopGroupPool == DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
checkState(
allProvided || nonProvided,
"All of BossEventLoopGroup, WorkerEventLoopGroup and ChannelType should be provided or "
+ "neither should be");
}
@Override

View File

@ -17,7 +17,6 @@
package io.grpc.netty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@ -201,60 +200,61 @@ public class NettyChannelBuilderTest {
}
@Test
public void shouldFallBackToNio_onlyGroupProvided() {
public void assertEventLoopAndChannelType_onlyGroupProvided() {
NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget");
builder.eventLoopGroup(mock(EventLoopGroup.class));
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Both EventLoopGroup and ChannelType should be provided");
assertTrue(builder.shouldFallBackToNio());
builder.assertEventLoopAndChannelType();
}
@Test
public void shouldFallBackToNio_onlyTypeProvided() {
public void assertEventLoopAndChannelType_onlyTypeProvided() {
NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget");
builder.channelType(LocalChannel.class);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Both EventLoopGroup and ChannelType should be provided");
assertTrue(builder.shouldFallBackToNio());
builder.assertEventLoopAndChannelType();
}
@Test
public void shouldFallBackToNio_onlyFactoryProvided() {
public void assertEventLoopAndChannelType_onlyFactoryProvided() {
NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget");
builder.channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel() {
return null;
}
});
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Both EventLoopGroup and ChannelType should be provided");
assertTrue(builder.shouldFallBackToNio());
builder.assertEventLoopAndChannelType();
}
@Test
public void shouldFallBackToNio_usingDefault() {
public void assertEventLoopAndChannelType_usingDefault() {
NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget");
assertFalse(builder.shouldFallBackToNio());
builder.assertEventLoopAndChannelType();
}
@Test
public void shouldFallBackToNio_bothProvided() {
public void assertEventLoopAndChannelType_bothProvided() {
NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget");
builder.eventLoopGroup(mock(EventLoopGroup.class));
builder.channelType(LocalChannel.class);
assertFalse(builder.shouldFallBackToNio());
builder.assertEventLoopAndChannelType();
}
@Test
public void useNioTransport_shouldNotFallBack() {
NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget");
InternalNettyChannelBuilder.useNioTransport(builder);
assertFalse(builder.shouldFallBackToNio());
builder.assertEventLoopAndChannelType();
}
}

View File

@ -16,8 +16,6 @@
package io.grpc.netty;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -134,50 +132,57 @@ public class NettyServerBuilderTest {
}
@Test
public void shouldFallBackToNio_onlyBossGroupProvided() {
public void assertEventLoopsAndChannelType_onlyBossGroupProvided() {
EventLoopGroup mockEventLoopGroup = mock(EventLoopGroup.class);
builder.bossEventLoopGroup(mockEventLoopGroup);
thrown.expect(IllegalStateException.class);
thrown.expectMessage(
"All of BossEventLoopGroup, WorkerEventLoopGroup and ChannelType should be provided");
assertTrue(builder.shouldFallBackToNio());
builder.assertEventLoopsAndChannelType();
}
@Test
public void shouldFallBackToNio_onlyWorkerGroupProvided() {
public void assertEventLoopsAndChannelType_onlyWorkerGroupProvided() {
EventLoopGroup mockEventLoopGroup = mock(EventLoopGroup.class);
builder.workerEventLoopGroup(mockEventLoopGroup);
thrown.expect(IllegalStateException.class);
thrown.expectMessage(
"All of BossEventLoopGroup, WorkerEventLoopGroup and ChannelType should be provided");
assertTrue(builder.shouldFallBackToNio());
builder.assertEventLoopsAndChannelType();
}
@Test
public void shouldFallBackToNio_onlyTypeProvided() {
public void assertEventLoopsAndChannelType_onlyTypeProvided() {
builder.channelType(LocalServerChannel.class);
thrown.expect(IllegalStateException.class);
thrown.expectMessage(
"All of BossEventLoopGroup, WorkerEventLoopGroup and ChannelType should be provided");
assertTrue(builder.shouldFallBackToNio());
builder.assertEventLoopsAndChannelType();
}
@Test
public void shouldFallBackToNio_usingDefault() {
assertFalse(builder.shouldFallBackToNio());
public void assertEventLoopsAndChannelType_usingDefault() {
builder.assertEventLoopsAndChannelType();
}
@Test
public void shouldFallBackToNio_allProvided() {
public void assertEventLoopsAndChannelType_allProvided() {
EventLoopGroup mockEventLoopGroup = mock(EventLoopGroup.class);
builder.bossEventLoopGroup(mockEventLoopGroup);
builder.workerEventLoopGroup(mockEventLoopGroup);
builder.channelType(LocalServerChannel.class);
assertFalse(builder.shouldFallBackToNio());
builder.assertEventLoopsAndChannelType();
}
@Test
public void useNioTransport_shouldNotFallBack() {
public void useNioTransport_shouldNotThrow() {
InternalNettyServerBuilder.useNioTransport(builder);
assertFalse(builder.shouldFallBackToNio());
builder.assertEventLoopsAndChannelType();
}
}