Add support to Netty builders for other channel types. Demonstrate and test use of this with local channels

This commit is contained in:
Louis Ryan 2015-03-12 11:43:26 -07:00
parent 89cb2d1774
commit 1216de6262
6 changed files with 125 additions and 32 deletions

View File

@ -0,0 +1,70 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.testing.integration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import io.grpc.ChannelImpl;
import io.grpc.transport.netty.NegotiationType;
import io.grpc.transport.netty.NettyChannelBuilder;
import io.grpc.transport.netty.NettyServerBuilder;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
/**
* Run transport tests over the Netty in-process channel.
*/
public class Http2NettyLocalChannelTest extends AbstractTransportTest {
@BeforeClass
public static void startServer() {
startStaticServer(
NettyServerBuilder
.forAddress(new LocalAddress("in-process-1"))
.channelType(LocalServerChannel.class));
}
@AfterClass
public static void stopServer() {
stopStaticServer();
}
@Override
protected ChannelImpl createChannel() {
return NettyChannelBuilder
.forAddress(new LocalAddress("in-process-1"))
.negotiationType(NegotiationType.PLAINTEXT)
.channelType(LocalChannel.class)
.build();
}
}

View File

@ -31,10 +31,14 @@
package io.grpc.transport.netty;
import com.google.common.base.Preconditions;
import io.grpc.AbstractChannelBuilder;
import io.grpc.SharedResourceHolder;
import io.grpc.transport.ClientTransportFactory;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import java.net.InetSocketAddress;
@ -48,6 +52,7 @@ public final class NettyChannelBuilder extends AbstractChannelBuilder<NettyChann
private final SocketAddress serverAddress;
private NegotiationType negotiationType = NegotiationType.TLS;
private Class<? extends Channel> channelType = NioSocketChannel.class;
private EventLoopGroup userEventLoopGroup;
private SslContext sslContext;
@ -69,6 +74,14 @@ public final class NettyChannelBuilder extends AbstractChannelBuilder<NettyChann
this.serverAddress = serverAddress;
}
/**
* Specify the channel type to use, by default we use {@link NioSocketChannel}.
*/
public NettyChannelBuilder channelType(Class<? extends Channel> channelType) {
this.channelType = Preconditions.checkNotNull(channelType);
return this;
}
/**
* Sets the negotiation type for the HTTP/2 connection.
*
@ -104,7 +117,7 @@ public final class NettyChannelBuilder extends AbstractChannelBuilder<NettyChann
final EventLoopGroup group = (userEventLoopGroup == null)
? SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP) : userEventLoopGroup;
ClientTransportFactory transportFactory = new NettyClientTransportFactory(
serverAddress, negotiationType, group, sslContext);
serverAddress, channelType, negotiationType, group, sslContext);
Runnable terminationRunnable = null;
if (userEventLoopGroup == null) {
terminationRunnable = new Runnable() {

View File

@ -47,8 +47,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
@ -85,6 +83,7 @@ class NettyClientTransport implements ClientTransport {
private static final Logger log = Logger.getLogger(NettyClientTransport.class.getName());
private final SocketAddress address;
private final Class<? extends Channel> channelType;
private final EventLoopGroup group;
private final Http2Negotiator.Negotiation negotiation;
private final NettyClientHandler handler;
@ -109,22 +108,23 @@ class NettyClientTransport implements ClientTransport {
@GuardedBy("this")
private boolean terminated;
NettyClientTransport(SocketAddress address, NegotiationType negotiationType,
EventLoopGroup group, SslContext sslContext) {
NettyClientTransport(SocketAddress address, Class<? extends Channel> channelType,
NegotiationType negotiationType, EventLoopGroup group, SslContext sslContext) {
Preconditions.checkNotNull(negotiationType, "negotiationType");
this.address = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group");
this.channelType = Preconditions.checkNotNull(channelType, "channelType");
InetSocketAddress inetAddress = null;
if (address instanceof InetSocketAddress) {
inetAddress = (InetSocketAddress) address;
authority = new AsciiString(inetAddress.getHostString() + ":" + inetAddress.getPort());
} else if (address instanceof LocalAddress) {
authority = new AsciiString(address.toString());
Preconditions.checkArgument(negotiationType != NegotiationType.TLS,
"TLS not supported for in-process transport");
} else {
throw new IllegalStateException("Unknown socket address type " + address.toString());
Preconditions.checkState(negotiationType != NegotiationType.TLS,
"TLS not supported for non-internet socket types");
// Specialized address types are allowed to support custom Channel types so just assume their
// toString() values are valid :authority values
authority = new AsciiString(address.toString());
}
DefaultHttp2StreamRemovalPolicy streamRemovalPolicy = new DefaultHttp2StreamRemovalPolicy();
@ -201,10 +201,8 @@ class NettyClientTransport implements ClientTransport {
listener = Preconditions.checkNotNull(transportListener, "listener");
Bootstrap b = new Bootstrap();
b.group(group);
if (address instanceof LocalAddress) {
b.channel(LocalChannel.class);
} else {
b.channel(NioSocketChannel.class);
b.channel(channelType);
if (NioSocketChannel.class.isAssignableFrom(channelType)) {
b.option(SO_KEEPALIVE, true);
}
b.handler(negotiation.initializer());

View File

@ -34,6 +34,7 @@ package io.grpc.transport.netty;
import com.google.common.base.Preconditions;
import io.grpc.transport.ClientTransportFactory;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.ssl.SslContext;
@ -46,19 +47,21 @@ class NettyClientTransportFactory implements ClientTransportFactory {
private final SocketAddress address;
private final NegotiationType negotiationType;
private final Class<? extends Channel> channelType;
private final EventLoopGroup group;
private final SslContext sslContext;
public NettyClientTransportFactory(SocketAddress address, NegotiationType negotiationType,
EventLoopGroup group, SslContext sslContext) {
public NettyClientTransportFactory(SocketAddress address, Class<? extends Channel> channelType,
NegotiationType negotiationType, EventLoopGroup group, SslContext sslContext) {
this.address = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group");
this.negotiationType = Preconditions.checkNotNull(negotiationType, "negotiationType");
this.channelType = Preconditions.checkNotNull(channelType, "channelType");
this.sslContext = sslContext;
}
@Override
public NettyClientTransport newClientTransport() {
return new NettyClientTransport(address, negotiationType, group, sslContext);
return new NettyClientTransport(address, channelType, negotiationType, group, sslContext);
}
}

View File

@ -44,8 +44,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
@ -59,22 +58,25 @@ import javax.annotation.Nullable;
*/
public class NettyServer extends AbstractService {
private final SocketAddress address;
private final Class<? extends ServerChannel> channelType;
private final ChannelInitializer<Channel> channelInitializer;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private Channel channel;
public NettyServer(ServerListener serverListener, SocketAddress address, EventLoopGroup bossGroup,
public NettyServer(ServerListener serverListener, SocketAddress address,
Class<? extends ServerChannel> channelType, EventLoopGroup bossGroup,
EventLoopGroup workerGroup) {
this(serverListener, address, bossGroup, workerGroup, null);
this(serverListener, address, channelType, bossGroup, workerGroup, null);
}
public NettyServer(final ServerListener serverListener, SocketAddress address,
EventLoopGroup bossGroup,
Class<? extends ServerChannel> channelType, EventLoopGroup bossGroup,
EventLoopGroup workerGroup, @Nullable final SslContext sslContext) {
Preconditions.checkNotNull(bossGroup, "bossGroup");
Preconditions.checkNotNull(workerGroup, "workerGroup");
this.address = address;
this.channelType = Preconditions.checkNotNull(channelType, "channelType");
this.bossGroup = Preconditions.checkNotNull(bossGroup, "bossGroup");
this.workerGroup = Preconditions.checkNotNull(workerGroup, "workerGroup");
this.channelInitializer = new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
@ -83,18 +85,14 @@ public class NettyServer extends AbstractService {
// TODO(nmittler): Should we wait for transport shutdown before shutting down server?
}
};
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
}
@Override
protected void doStart() {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
if (address instanceof LocalAddress) {
b.channel(LocalServerChannel.class);
} else {
b.channel(NioServerSocketChannel.class);
b.channel(channelType);
if (NioServerSocketChannel.class.isAssignableFrom(channelType)) {
b.option(SO_BACKLOG, 128);
b.childOption(SO_KEEPALIVE, true);
}

View File

@ -31,6 +31,7 @@
package io.grpc.transport.netty;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
@ -39,6 +40,8 @@ import io.grpc.HandlerRegistry;
import io.grpc.SharedResourceHolder;
import io.grpc.transport.ServerListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.grpc.ServerImpl;
@ -51,7 +54,7 @@ import java.net.SocketAddress;
public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerBuilder> {
private final SocketAddress address;
private Class<? extends ServerChannel> channelType = NioServerSocketChannel.class;
private EventLoopGroup userBossEventLoopGroup;
private EventLoopGroup userWorkerEventLoopGroup;
private SslContext sslContext;
@ -101,6 +104,14 @@ public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerB
this.address = address;
}
/**
* Specify the channel type to use, by default we use {@link NioServerSocketChannel}.
*/
public NettyServerBuilder channelType(Class<? extends ServerChannel> channelType) {
this.channelType = Preconditions.checkNotNull(channelType);
return this;
}
/**
* Provides the boss EventGroupLoop to the server.
*
@ -162,7 +173,7 @@ public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerB
final EventLoopGroup workerEventLoopGroup = (userWorkerEventLoopGroup == null)
? SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP)
: userWorkerEventLoopGroup;
NettyServer server = new NettyServer(serverListener, address, bossEventLoopGroup,
NettyServer server = new NettyServer(serverListener, address, channelType, bossEventLoopGroup,
workerEventLoopGroup, sslContext);
if (userBossEventLoopGroup == null) {
server.addListener(new ClosureHook() {