mirror of https://github.com/grpc/grpc-java.git
inprocess: Fix listener race if transport is shutdown while starting
Returning the runnable did nothing, as both the start method and the runnable are run within the synchronization context. I believe the Runnable used to be required in the previous implementation of ManagedChannelImpl (the lock-based implementation before we created SynchronizationContext). This fixes a NPE seen in ServerImpl because the server expects proper ordering of transport lifecycle events. ``` Uncaught exception in the SynchronizationContext. Panic! java.lang.NullPointerException: Cannot invoke "java.util.concurrent.Future.cancel(boolean)" because "this.handshakeTimeoutFuture" is null at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.transportReady(ServerImpl.java:440) at io.grpc.inprocess.InProcessTransport$4.run(InProcessTransport.java:215) at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:94) ``` b/338445186
This commit is contained in:
parent
107fdb4b7c
commit
e4e7f3a068
|
@ -350,6 +350,26 @@ public abstract class AbstractTransportTest {
|
||||||
verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
|
verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void clientShutdownBeforeStartRunnable() throws Exception {
|
||||||
|
server.start(serverListener);
|
||||||
|
client = newClientTransport(server);
|
||||||
|
Runnable runnable = client.start(mockClientTransportListener);
|
||||||
|
// Shutdown before calling 'runnable'
|
||||||
|
client.shutdown(Status.UNAVAILABLE.withDescription("shutdown called"));
|
||||||
|
runIfNotNull(runnable);
|
||||||
|
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
|
||||||
|
// We should verify that clients don't call transportReady() after transportTerminated(), but
|
||||||
|
// transports do this today and nothing cares. ServerImpl, on the other hand, doesn't appreciate
|
||||||
|
// the out-of-order calls.
|
||||||
|
MockServerTransportListener serverTransportListener
|
||||||
|
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
|
||||||
|
assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
||||||
|
// Allow any status as some transports (e.g., Netty) don't communicate the original status when
|
||||||
|
// shutdown while handshaking. It won't be used anyway, so no big deal.
|
||||||
|
verify(mockClientTransportListener).transportShutdown(any(Status.class));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void clientStartAndStopOnceConnected() throws Exception {
|
public void clientStartAndStopOnceConnected() throws Exception {
|
||||||
server.start(serverListener);
|
server.start(serverListener);
|
||||||
|
@ -2251,6 +2271,7 @@ public abstract class AbstractTransportTest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Attributes transportReady(Attributes attributes) {
|
public Attributes transportReady(Attributes attributes) {
|
||||||
|
assertFalse(terminated.isDone());
|
||||||
return Attributes.newBuilder()
|
return Attributes.newBuilder()
|
||||||
.setAll(attributes)
|
.setAll(attributes)
|
||||||
.set(ADDITIONAL_TRANSPORT_ATTR_KEY, "additional attribute value")
|
.set(ADDITIONAL_TRANSPORT_ATTR_KEY, "additional attribute value")
|
||||||
|
|
|
@ -203,21 +203,14 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
return new Runnable() {
|
Attributes serverTransportAttrs = Attributes.newBuilder()
|
||||||
@Override
|
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
|
||||||
@SuppressWarnings("deprecation")
|
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
|
||||||
public void run() {
|
.build();
|
||||||
synchronized (InProcessTransport.this) {
|
serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
|
||||||
Attributes serverTransportAttrs = Attributes.newBuilder()
|
attributes = clientTransportListener.filterTransport(attributes);
|
||||||
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
|
clientTransportListener.transportReady();
|
||||||
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
|
return null;
|
||||||
.build();
|
|
||||||
serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
|
|
||||||
attributes = clientTransportListener.filterTransport(attributes);
|
|
||||||
clientTransportListener.transportReady();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -262,4 +262,9 @@ public class JettyTransportTest extends AbstractTransportTest {
|
||||||
@Ignore("https://github.com/jetty/jetty.project/issues/11822")
|
@Ignore("https://github.com/jetty/jetty.project/issues/11822")
|
||||||
@Test
|
@Test
|
||||||
public void clientChecksInboundMetadataSize_trailer() {}
|
public void clientChecksInboundMetadataSize_trailer() {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Ignore("Not yet investigated, but has been seen for multiple servlet containers")
|
||||||
|
@Test
|
||||||
|
public void clientShutdownBeforeStartRunnable() {}
|
||||||
}
|
}
|
||||||
|
|
|
@ -274,4 +274,9 @@ public class TomcatTransportTest extends AbstractTransportTest {
|
||||||
@Ignore("regression since bumping grpc v1.46 to v1.53")
|
@Ignore("regression since bumping grpc v1.46 to v1.53")
|
||||||
@Test
|
@Test
|
||||||
public void messageProducerOnlyProducesRequestedMessages() {}
|
public void messageProducerOnlyProducesRequestedMessages() {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Ignore("Not yet investigated, but has been seen for multiple servlet containers")
|
||||||
|
@Test
|
||||||
|
public void clientShutdownBeforeStartRunnable() {}
|
||||||
}
|
}
|
||||||
|
|
|
@ -308,4 +308,9 @@ public class UndertowTransportTest extends AbstractTransportTest {
|
||||||
@Ignore("regression since bumping grpc v1.46 to v1.53")
|
@Ignore("regression since bumping grpc v1.46 to v1.53")
|
||||||
@Test
|
@Test
|
||||||
public void messageProducerOnlyProducesRequestedMessages() {}
|
public void messageProducerOnlyProducesRequestedMessages() {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Ignore("Not yet investigated, but has been seen for multiple servlet containers")
|
||||||
|
@Test
|
||||||
|
public void clientShutdownBeforeStartRunnable() {}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue