From 41d875c7e36f75f62b78271e77b560488d58a68a Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Mon, 3 Aug 2015 13:02:33 -0700 Subject: [PATCH] Notify transportReady() in Netty --- .../grpc/transport/netty/NettyClientHandler.java | 8 ++++++++ .../grpc/transport/netty/NettyClientTransport.java | 14 ++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/transport/netty/NettyClientHandler.java index 1f169179d4..9125453fbc 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyClientHandler.java @@ -44,6 +44,7 @@ import io.grpc.transport.ClientTransport.PingCallback; import io.grpc.transport.Http2Ping; import io.grpc.transport.HttpUtil; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -75,6 +76,11 @@ import javax.annotation.Nullable; */ class NettyClientHandler extends Http2ConnectionHandler { private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName()); + /** + * A message that simply passes through the channel without any real processing. It is useful to + * check if buffers have been drained and test the health of the channel in a single operation. + */ + static final Object NOOP_MESSAGE = new Object(); private final Http2Connection.PropertyKey streamKey; private final Ticker ticker; @@ -160,6 +166,8 @@ class NettyClientHandler extends Http2ConnectionHandler { ((RequestMessagesCommand) msg).requestMessages(); } else if (msg instanceof SendPingCommand) { sendPingFrame(ctx, (SendPingCommand) msg, promise); + } else if (msg == NOOP_MESSAGE) { + ctx.write(Unpooled.EMPTY_BUFFER, promise); } else { throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName()); } diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java index 3a9e81b9dd..497e838820 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java @@ -173,6 +173,20 @@ class NettyClientTransport implements ClientTransport { channel = b.connect(address).channel(); // Start the write queue as soon as the channel is constructed handler.startWriteQueue(channel); + // This write will have no effect, yet it will only complete once the negotiationHandler + // flushes any pending writes. + channel.write(NettyClientHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + listener.transportReady(); + } else { + // Need to notify of this failure, because handler.connectionError() is not guaranteed to + // have seen this cause. + notifyTerminated(future.cause()); + } + } + }); // Handle transport shutdown when the channel is closed. channel.closeFuture().addListener(new ChannelFutureListener() { @Override