Notify transportReady() in Netty

This commit is contained in:
Eric Anderson 2015-08-03 13:02:33 -07:00
parent 137b2ef8c3
commit 41d875c7e3
2 changed files with 22 additions and 0 deletions

View File

@ -44,6 +44,7 @@ import io.grpc.transport.ClientTransport.PingCallback;
import io.grpc.transport.Http2Ping;
import io.grpc.transport.HttpUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@ -75,6 +76,11 @@ import javax.annotation.Nullable;
*/
class NettyClientHandler extends Http2ConnectionHandler {
private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
/**
* A message that simply passes through the channel without any real processing. It is useful to
* check if buffers have been drained and test the health of the channel in a single operation.
*/
static final Object NOOP_MESSAGE = new Object();
private final Http2Connection.PropertyKey streamKey;
private final Ticker ticker;
@ -160,6 +166,8 @@ class NettyClientHandler extends Http2ConnectionHandler {
((RequestMessagesCommand) msg).requestMessages();
} else if (msg instanceof SendPingCommand) {
sendPingFrame(ctx, (SendPingCommand) msg, promise);
} else if (msg == NOOP_MESSAGE) {
ctx.write(Unpooled.EMPTY_BUFFER, promise);
} else {
throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
}

View File

@ -173,6 +173,20 @@ class NettyClientTransport implements ClientTransport {
channel = b.connect(address).channel();
// Start the write queue as soon as the channel is constructed
handler.startWriteQueue(channel);
// This write will have no effect, yet it will only complete once the negotiationHandler
// flushes any pending writes.
channel.write(NettyClientHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
listener.transportReady();
} else {
// Need to notify of this failure, because handler.connectionError() is not guaranteed to
// have seen this cause.
notifyTerminated(future.cause());
}
}
});
// Handle transport shutdown when the channel is closed.
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override