diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index e2181766af..5c085c20c2 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -62,6 +62,7 @@ import java.util.concurrent.Executor; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.CheckReturnValue; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; @@ -89,8 +90,9 @@ class InProcessTransport implements ServerTransport, ConnectionClientTransport { .build(); } + @CheckReturnValue @Override - public synchronized void start(ManagedClientTransport.Listener listener) { + public synchronized Runnable start(ManagedClientTransport.Listener listener) { this.clientTransportListener = listener; InProcessServer server = InProcessServer.findServer(name); if (server != null) { @@ -99,7 +101,7 @@ class InProcessTransport implements ServerTransport, ConnectionClientTransport { if (serverTransportListener == null) { shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + name); final Status localShutdownStatus = shutdownStatus; - Thread shutdownThread = new Thread(new Runnable() { + return new Runnable() { @Override public void run() { synchronized (InProcessTransport.this) { @@ -107,23 +109,16 @@ class InProcessTransport implements ServerTransport, ConnectionClientTransport { notifyTerminated(); } } - }); - shutdownThread.setDaemon(true); - shutdownThread.setName("grpc-inprocess-shutdown"); - shutdownThread.start(); - return; + }; } - Thread readyThread = new Thread(new Runnable() { + return new Runnable() { @Override public void run() { synchronized (InProcessTransport.this) { clientTransportListener.transportReady(); } } - }); - readyThread.setDaemon(true); - readyThread.setName("grpc-inprocess-ready"); - readyThread.start(); + }; } @Override diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index a99395bc0e..dfabfeeffb 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -96,8 +96,9 @@ class DelayedClientTransport implements ManagedClientTransport { } @Override - public void start(Listener listener) { + public Runnable start(Listener listener) { this.listener = Preconditions.checkNotNull(listener, "listener"); + return null; } /** diff --git a/core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java b/core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java index 9f973c2152..b79feb6d71 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java +++ b/core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java @@ -41,8 +41,8 @@ import java.util.concurrent.Executor; abstract class ForwardingConnectionClientTransport implements ConnectionClientTransport { @Override - public void start(Listener listener) { - delegate().start(listener); + public Runnable start(Listener listener) { + return delegate().start(listener); } @Override diff --git a/core/src/main/java/io/grpc/internal/ManagedClientTransport.java b/core/src/main/java/io/grpc/internal/ManagedClientTransport.java index a1fbf97bf4..cbe2c088c9 100644 --- a/core/src/main/java/io/grpc/internal/ManagedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/ManagedClientTransport.java @@ -33,6 +33,8 @@ package io.grpc.internal; import io.grpc.Status; +import javax.annotation.CheckReturnValue; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; /** @@ -53,12 +55,16 @@ public interface ManagedClientTransport extends ClientTransport, WithLogId { * Starts transport. This method may only be called once. * *

Implementations must not call {@code listener} from within {@link #start}; implementations - * are expected to notify listener on a separate thread. This method should not throw any - * exceptions. + * are expected to notify listener on a separate thread or when the returned {@link Runnable} is + * run. This method and the returned {@code Runnable} should not throw any exceptions. * * @param listener non-{@code null} listener of transport events + * @return a {@link Runnable} that is executed after-the-fact by the original caller, typically + * after locks are released */ - void start(Listener listener); + @CheckReturnValue + @Nullable + Runnable start(Listener listener); /** * Initiates an orderly shutdown of the transport. Existing streams continue, but the transport diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java index ca43752629..dc04c94975 100644 --- a/core/src/main/java/io/grpc/internal/TransportSet.java +++ b/core/src/main/java/io/grpc/internal/TransportSet.java @@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; @@ -173,25 +174,32 @@ final class TransportSet implements WithLogId { if (savedTransport != null) { return savedTransport; } + Runnable runnable; synchronized (lock) { // Check again, since it could have changed before acquiring the lock - if (activeTransport == null) { - if (shutdown) { - return SHUTDOWN_TRANSPORT; - } - // Transition to CONNECTING - DelayedClientTransport delayedTransport = new DelayedClientTransport(appExecutor); - transports.add(delayedTransport); - delayedTransport.start(new BaseTransportListener(delayedTransport)); - activeTransport = delayedTransport; - startNewTransport(delayedTransport); + savedTransport = activeTransport; + if (savedTransport != null) { + return savedTransport; } - return activeTransport; + if (shutdown) { + return SHUTDOWN_TRANSPORT; + } + // Transition to CONNECTING + DelayedClientTransport delayedTransport = new DelayedClientTransport(appExecutor); + transports.add(delayedTransport); + delayedTransport.start(new BaseTransportListener(delayedTransport)); + savedTransport = activeTransport = delayedTransport; + runnable = startNewTransport(delayedTransport); } + if (runnable != null) { + runnable.run(); + } + return savedTransport; } + @CheckReturnValue @GuardedBy("lock") - private void startNewTransport(DelayedClientTransport delayedTransport) { + private Runnable startNewTransport(DelayedClientTransport delayedTransport) { Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled"); if (nextAddressIndex == 0) { @@ -211,7 +219,7 @@ final class TransportSet implements WithLogId { } pendingTransport = transport; transports.add(transport); - transport.start(new TransportListener(transport, delayedTransport, address)); + return transport.start(new TransportListener(transport, delayedTransport, address)); } /** @@ -239,17 +247,21 @@ final class TransportSet implements WithLogId { try { delayedTransport.endBackoff(); boolean shutdownDelayedTransport = false; + Runnable runnable = null; synchronized (lock) { reconnectTask = null; if (delayedTransport.hasPendingStreams()) { // Transition directly to CONNECTING - startNewTransport(delayedTransport); + runnable = startNewTransport(delayedTransport); } else { // Transition to IDLE (or already SHUTDOWN) activeTransport = null; shutdownDelayedTransport = true; } } + if (runnable != null) { + runnable.run(); + } if (shutdownDelayedTransport) { delayedTransport.setTransportSupplier(new Supplier() { @Override @@ -425,6 +437,7 @@ final class TransportSet implements WithLogId { new Object[] {getLogId(), transport.getLogId(), address, s}); } super.transportShutdown(s); + Runnable runnable = null; synchronized (lock) { if (activeTransport == transport) { // This is true only if the transport was ready. @@ -440,10 +453,13 @@ final class TransportSet implements WithLogId { scheduleBackoff(delayedTransport, s); } else { // Still CONNECTING - startNewTransport(delayedTransport); + runnable = startNewTransport(delayedTransport); } } } + if (runnable != null) { + runnable.run(); + } loadBalancer.handleTransportShutdown(addressGroup, s); if (allAddressesFailed) { callback.onAllAddressesFailed(); diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 0d201ccf13..94c92c9b78 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -133,7 +133,7 @@ class NettyClientTransport implements ConnectionClientTransport { } @Override - public void start(Listener transportListener) { + public Runnable start(Listener transportListener) { lifecycleManager = new ClientTransportLifecycleManager( Preconditions.checkNotNull(transportListener, "listener")); @@ -191,6 +191,7 @@ class NettyClientTransport implements ConnectionClientTransport { Status.INTERNAL.withDescription("Connection closed with unknown cause")); } }); + return null; } @Override diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 544851a4a2..f43714aeda 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -344,7 +344,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { } @Override - public void start(Listener listener) { + public Runnable start(Listener listener) { this.listener = Preconditions.checkNotNull(listener, "listener"); if (enableKeepAlive) { @@ -433,6 +433,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { } } }); + return null; } @Override diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index b185fde0fa..41732e878b 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -182,7 +182,7 @@ public abstract class AbstractTransportTest { public void frameAfterRstStreamShouldNotBreakClientChannel() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -232,7 +232,7 @@ public abstract class AbstractTransportTest { server = null; InOrder inOrder = inOrder(mockClientTransportListener); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); inOrder.verify(mockClientTransportListener).transportShutdown(statusCaptor.capture()); assertCodeEquals(Status.UNAVAILABLE, statusCaptor.getValue()); @@ -246,7 +246,7 @@ public abstract class AbstractTransportTest { server.start(serverListener); client = newClientTransport(server); InOrder inOrder = inOrder(mockClientTransportListener); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); client.shutdown(); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); inOrder.verify(mockClientTransportListener).transportShutdown(statusCaptor.capture()); @@ -260,7 +260,7 @@ public abstract class AbstractTransportTest { server.start(serverListener); client = newClientTransport(server); InOrder inOrder = inOrder(mockClientTransportListener); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); @@ -288,7 +288,7 @@ public abstract class AbstractTransportTest { public void openStreamPreventsTermination() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -339,7 +339,7 @@ public abstract class AbstractTransportTest { public void shutdownNowKillsClientStream() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -370,7 +370,7 @@ public abstract class AbstractTransportTest { public void shutdownNowKillsServerStream() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -403,7 +403,7 @@ public abstract class AbstractTransportTest { public void ping() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class); try { client.ping(mockPingCallback, MoreExecutors.directExecutor()); @@ -419,7 +419,7 @@ public abstract class AbstractTransportTest { public void ping_duringShutdown() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); // Stream prevents termination ClientStream stream = client.newStream(methodDescriptor, new Metadata()); stream.start(mockClientStreamListener); @@ -440,7 +440,7 @@ public abstract class AbstractTransportTest { public void ping_afterTermination() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); client.shutdown(); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); @@ -466,7 +466,7 @@ public abstract class AbstractTransportTest { public void newStream_duringShutdown() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); // Stream prevents termination ClientStream stream = client.newStream(methodDescriptor, new Metadata()); stream.start(mockClientStreamListener); @@ -497,7 +497,7 @@ public abstract class AbstractTransportTest { // dealing with afterTermination is harder than duringShutdown. server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); client.shutdown(); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); @@ -514,7 +514,7 @@ public abstract class AbstractTransportTest { public void transportInUse_normalClose() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); ClientStream stream1 = client.newStream(methodDescriptor, new Metadata()); stream1.start(mockClientStreamListener); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true); @@ -542,7 +542,7 @@ public abstract class AbstractTransportTest { public void transportInUse_clientCancel() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); ClientStream stream1 = client.newStream(methodDescriptor, new Metadata()); stream1.start(mockClientStreamListener); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true); @@ -562,7 +562,7 @@ public abstract class AbstractTransportTest { public void basicStream() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -640,7 +640,7 @@ public abstract class AbstractTransportTest { public void zeroMessageStream() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -672,7 +672,7 @@ public abstract class AbstractTransportTest { public void earlyServerClose_withServerHeaders() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -702,7 +702,7 @@ public abstract class AbstractTransportTest { public void earlyServerClose_noServerHeaders() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -739,7 +739,7 @@ public abstract class AbstractTransportTest { public void earlyServerClose_serverFailure() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -767,7 +767,7 @@ public abstract class AbstractTransportTest { public void clientCancel() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -798,7 +798,7 @@ public abstract class AbstractTransportTest { public void clientCancelFromWithinMessageRead() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -852,7 +852,7 @@ public abstract class AbstractTransportTest { public void serverCancel() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -889,7 +889,7 @@ public abstract class AbstractTransportTest { public void flowControlPushBack() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -1052,7 +1052,7 @@ public abstract class AbstractTransportTest { */ private void doPingPong(MockServerListener serverListener) throws InterruptedException { ManagedClientTransport client = newClientTransport(server); - client.start(mock(ManagedClientTransport.Listener.class)); + runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class))); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata()); ClientStreamListener mockClientStreamListener = mock(ClientStreamListener.class); clientStream.start(mockClientStreamListener); @@ -1100,6 +1100,12 @@ public abstract class AbstractTransportTest { return true; } + private static void runIfNotNull(Runnable runnable) { + if (runnable != null) { + runnable.run(); + } + } + private static class MockServerListener implements ServerListener { public final BlockingQueue listeners = new LinkedBlockingQueue();