From e4e7f3a068b0edf9404cad8cc6b3eac70ee254c8 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Tue, 28 May 2024 15:47:15 -0700 Subject: [PATCH] inprocess: Fix listener race if transport is shutdown while starting Returning the runnable did nothing, as both the start method and the runnable are run within the synchronization context. I believe the Runnable used to be required in the previous implementation of ManagedChannelImpl (the lock-based implementation before we created SynchronizationContext). This fixes a NPE seen in ServerImpl because the server expects proper ordering of transport lifecycle events. ``` Uncaught exception in the SynchronizationContext. Panic! java.lang.NullPointerException: Cannot invoke "java.util.concurrent.Future.cancel(boolean)" because "this.handshakeTimeoutFuture" is null at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.transportReady(ServerImpl.java:440) at io.grpc.inprocess.InProcessTransport$4.run(InProcessTransport.java:215) at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:94) ``` b/338445186 --- .../grpc/internal/AbstractTransportTest.java | 21 +++++++++++++++++ .../io/grpc/inprocess/InProcessTransport.java | 23 +++++++------------ .../io/grpc/servlet/JettyTransportTest.java | 5 ++++ .../io/grpc/servlet/TomcatTransportTest.java | 5 ++++ .../grpc/servlet/UndertowTransportTest.java | 5 ++++ 5 files changed, 44 insertions(+), 15 deletions(-) diff --git a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java index 57d870575d..103c12475b 100644 --- a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java @@ -350,6 +350,26 @@ public abstract class AbstractTransportTest { verify(mockClientTransportListener, never()).transportInUse(anyBoolean()); } + @Test + public void clientShutdownBeforeStartRunnable() throws Exception { + server.start(serverListener); + client = newClientTransport(server); + Runnable runnable = client.start(mockClientTransportListener); + // Shutdown before calling 'runnable' + client.shutdown(Status.UNAVAILABLE.withDescription("shutdown called")); + runIfNotNull(runnable); + verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); + // We should verify that clients don't call transportReady() after transportTerminated(), but + // transports do this today and nothing cares. ServerImpl, on the other hand, doesn't appreciate + // the out-of-order calls. + MockServerTransportListener serverTransportListener + = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + // Allow any status as some transports (e.g., Netty) don't communicate the original status when + // shutdown while handshaking. It won't be used anyway, so no big deal. + verify(mockClientTransportListener).transportShutdown(any(Status.class)); + } + @Test public void clientStartAndStopOnceConnected() throws Exception { server.start(serverListener); @@ -2251,6 +2271,7 @@ public abstract class AbstractTransportTest { @Override public Attributes transportReady(Attributes attributes) { + assertFalse(terminated.isDone()); return Attributes.newBuilder() .setAll(attributes) .set(ADDITIONAL_TRANSPORT_ATTR_KEY, "additional attribute value") diff --git a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java index 01f6a07894..ae8ad143d2 100644 --- a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -203,21 +203,14 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans } }; } - return new Runnable() { - @Override - @SuppressWarnings("deprecation") - public void run() { - synchronized (InProcessTransport.this) { - Attributes serverTransportAttrs = Attributes.newBuilder() - .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address) - .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address) - .build(); - serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs); - attributes = clientTransportListener.filterTransport(attributes); - clientTransportListener.transportReady(); - } - } - }; + Attributes serverTransportAttrs = Attributes.newBuilder() + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address) + .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address) + .build(); + serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs); + attributes = clientTransportListener.filterTransport(attributes); + clientTransportListener.transportReady(); + return null; } @Override diff --git a/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java b/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java index e9cb391ea0..d1fc90752a 100644 --- a/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java +++ b/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java @@ -262,4 +262,9 @@ public class JettyTransportTest extends AbstractTransportTest { @Ignore("https://github.com/jetty/jetty.project/issues/11822") @Test public void clientChecksInboundMetadataSize_trailer() {} + + @Override + @Ignore("Not yet investigated, but has been seen for multiple servlet containers") + @Test + public void clientShutdownBeforeStartRunnable() {} } diff --git a/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatTransportTest.java b/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatTransportTest.java index 262036883a..21a1d3db7e 100644 --- a/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatTransportTest.java +++ b/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatTransportTest.java @@ -274,4 +274,9 @@ public class TomcatTransportTest extends AbstractTransportTest { @Ignore("regression since bumping grpc v1.46 to v1.53") @Test public void messageProducerOnlyProducesRequestedMessages() {} + + @Override + @Ignore("Not yet investigated, but has been seen for multiple servlet containers") + @Test + public void clientShutdownBeforeStartRunnable() {} } diff --git a/servlet/src/undertowTest/java/io/grpc/servlet/UndertowTransportTest.java b/servlet/src/undertowTest/java/io/grpc/servlet/UndertowTransportTest.java index e14c11985d..76dfb0c0fd 100644 --- a/servlet/src/undertowTest/java/io/grpc/servlet/UndertowTransportTest.java +++ b/servlet/src/undertowTest/java/io/grpc/servlet/UndertowTransportTest.java @@ -308,4 +308,9 @@ public class UndertowTransportTest extends AbstractTransportTest { @Ignore("regression since bumping grpc v1.46 to v1.53") @Test public void messageProducerOnlyProducesRequestedMessages() {} + + @Override + @Ignore("Not yet investigated, but has been seen for multiple servlet containers") + @Test + public void clientShutdownBeforeStartRunnable() {} }