diff --git a/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java b/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java index e9f7040564..e10150cd13 100644 --- a/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java +++ b/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java @@ -43,14 +43,25 @@ final class ClientTransportLifecycleManager { listener.transportReady(); } - public void notifyShutdown(Status s) { + /** + * Marks transport as shutdown, but does not set the error status. This must eventually be + * followed by a call to notifyShutdown. + */ + public void notifyGracefulShutdown(Status s) { if (transportShutdown) { return; } transportShutdown = true; + listener.transportShutdown(s); + } + + public void notifyShutdown(Status s) { + notifyGracefulShutdown(s); + if (shutdownStatus != null) { + return; + } shutdownStatus = s; shutdownThrowable = s.asException(); - listener.transportShutdown(s); } public void notifyInUse(boolean inUse) { diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index f04b3e6cbf..677358f4fe 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -755,10 +755,21 @@ class NettyClientHandler extends AbstractNettyHandler { /** * Handler for a GOAWAY being received. Fails any streams created after the - * last known stream. + * last known stream. May only be called during a read. */ private void goingAway(Status status) { + lifecycleManager.notifyGracefulShutdown(status); + // Try to allocate as many in-flight streams as possible, to reduce race window of + // https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to + // gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING + // after the first GOAWAY, so they can very precisely detect when the GOAWAY has been + // processed and thus this processing must be in-line before processing additional reads. + + // This can cause reentrancy, but should be minor since it is normal to handle writes in + // response to a read. Also, the call stack is rather shallow at this point + clientWriteQueue.drainNow(); lifecycleManager.notifyShutdown(status); + final Status goAwayStatus = lifecycleManager.getShutdownStatus(); final int lastKnownStream = connection().local().lastStreamKnownByPeer(); try { diff --git a/netty/src/main/java/io/grpc/netty/WriteQueue.java b/netty/src/main/java/io/grpc/netty/WriteQueue.java index cfbfe758d6..f80e3fcaa5 100644 --- a/netty/src/main/java/io/grpc/netty/WriteQueue.java +++ b/netty/src/main/java/io/grpc/netty/WriteQueue.java @@ -101,6 +101,19 @@ class WriteQueue { } } + /** + * Executes enqueued work directly on the current thread. This can be used to trigger writes + * before performing additional reads. Must be called from the event loop. This method makes no + * guarantee that the work queue is empty when it returns. + */ + void drainNow() { + Preconditions.checkState(channel.eventLoop().inEventLoop(), "must be on the event loop"); + if (queue.peek() == null) { + return; + } + flush(); + } + /** * Process the queue of commands and dispatch them to the stream. This method is only * called in the event loop diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index dfdfcf4f9c..c5f81325bd 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -361,6 +361,19 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { } } + protected final WriteQueue writeQueue() { + return writeQueue; + } + protected final T handler() { return handler; }