inprocess: Avoid creating unnecessary threads

Implementations of ManagedClientTransport.start() are restricted from
calling the passed listener until start() returns, in order to avoid
reentrency problems with locks. For most transports this isn't a
problem, because they need additional threads anyway. InProcess uses no
additional threads naturally so ends up needing a thread just to
notifyReady. Now transports can just return a Runnable that can be run
after locks are dropped.

This was originally intended to be a performance optimization, but the
thread also causes nondeterminism because RPCs are delayed until
notifyReady is called. So avoiding the thread reduces needless fakes
during tests.
This commit is contained in:
Eric Anderson 2016-07-31 19:15:40 -07:00
parent a8700a7837
commit 7d464fcb02
8 changed files with 85 additions and 59 deletions

View File

@ -62,6 +62,7 @@ import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckReturnValue;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
@ -89,8 +90,9 @@ class InProcessTransport implements ServerTransport, ConnectionClientTransport {
.build();
}
@CheckReturnValue
@Override
public synchronized void start(ManagedClientTransport.Listener listener) {
public synchronized Runnable start(ManagedClientTransport.Listener listener) {
this.clientTransportListener = listener;
InProcessServer server = InProcessServer.findServer(name);
if (server != null) {
@ -99,7 +101,7 @@ class InProcessTransport implements ServerTransport, ConnectionClientTransport {
if (serverTransportListener == null) {
shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + name);
final Status localShutdownStatus = shutdownStatus;
Thread shutdownThread = new Thread(new Runnable() {
return new Runnable() {
@Override
public void run() {
synchronized (InProcessTransport.this) {
@ -107,23 +109,16 @@ class InProcessTransport implements ServerTransport, ConnectionClientTransport {
notifyTerminated();
}
}
});
shutdownThread.setDaemon(true);
shutdownThread.setName("grpc-inprocess-shutdown");
shutdownThread.start();
return;
};
}
Thread readyThread = new Thread(new Runnable() {
return new Runnable() {
@Override
public void run() {
synchronized (InProcessTransport.this) {
clientTransportListener.transportReady();
}
}
});
readyThread.setDaemon(true);
readyThread.setName("grpc-inprocess-ready");
readyThread.start();
};
}
@Override

View File

@ -96,8 +96,9 @@ class DelayedClientTransport implements ManagedClientTransport {
}
@Override
public void start(Listener listener) {
public Runnable start(Listener listener) {
this.listener = Preconditions.checkNotNull(listener, "listener");
return null;
}
/**

View File

@ -41,8 +41,8 @@ import java.util.concurrent.Executor;
abstract class ForwardingConnectionClientTransport implements ConnectionClientTransport {
@Override
public void start(Listener listener) {
delegate().start(listener);
public Runnable start(Listener listener) {
return delegate().start(listener);
}
@Override

View File

@ -33,6 +33,8 @@ package io.grpc.internal;
import io.grpc.Status;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
/**
@ -53,12 +55,16 @@ public interface ManagedClientTransport extends ClientTransport, WithLogId {
* Starts transport. This method may only be called once.
*
* <p>Implementations must not call {@code listener} from within {@link #start}; implementations
* are expected to notify listener on a separate thread. This method should not throw any
* exceptions.
* are expected to notify listener on a separate thread or when the returned {@link Runnable} is
* run. This method and the returned {@code Runnable} should not throw any exceptions.
*
* @param listener non-{@code null} listener of transport events
* @return a {@link Runnable} that is executed after-the-fact by the original caller, typically
* after locks are released
*/
void start(Listener listener);
@CheckReturnValue
@Nullable
Runnable start(Listener listener);
/**
* Initiates an orderly shutdown of the transport. Existing streams continue, but the transport

View File

@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
@ -173,25 +174,32 @@ final class TransportSet implements WithLogId {
if (savedTransport != null) {
return savedTransport;
}
Runnable runnable;
synchronized (lock) {
// Check again, since it could have changed before acquiring the lock
if (activeTransport == null) {
if (shutdown) {
return SHUTDOWN_TRANSPORT;
}
// Transition to CONNECTING
DelayedClientTransport delayedTransport = new DelayedClientTransport(appExecutor);
transports.add(delayedTransport);
delayedTransport.start(new BaseTransportListener(delayedTransport));
activeTransport = delayedTransport;
startNewTransport(delayedTransport);
savedTransport = activeTransport;
if (savedTransport != null) {
return savedTransport;
}
return activeTransport;
if (shutdown) {
return SHUTDOWN_TRANSPORT;
}
// Transition to CONNECTING
DelayedClientTransport delayedTransport = new DelayedClientTransport(appExecutor);
transports.add(delayedTransport);
delayedTransport.start(new BaseTransportListener(delayedTransport));
savedTransport = activeTransport = delayedTransport;
runnable = startNewTransport(delayedTransport);
}
if (runnable != null) {
runnable.run();
}
return savedTransport;
}
@CheckReturnValue
@GuardedBy("lock")
private void startNewTransport(DelayedClientTransport delayedTransport) {
private Runnable startNewTransport(DelayedClientTransport delayedTransport) {
Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
if (nextAddressIndex == 0) {
@ -211,7 +219,7 @@ final class TransportSet implements WithLogId {
}
pendingTransport = transport;
transports.add(transport);
transport.start(new TransportListener(transport, delayedTransport, address));
return transport.start(new TransportListener(transport, delayedTransport, address));
}
/**
@ -239,17 +247,21 @@ final class TransportSet implements WithLogId {
try {
delayedTransport.endBackoff();
boolean shutdownDelayedTransport = false;
Runnable runnable = null;
synchronized (lock) {
reconnectTask = null;
if (delayedTransport.hasPendingStreams()) {
// Transition directly to CONNECTING
startNewTransport(delayedTransport);
runnable = startNewTransport(delayedTransport);
} else {
// Transition to IDLE (or already SHUTDOWN)
activeTransport = null;
shutdownDelayedTransport = true;
}
}
if (runnable != null) {
runnable.run();
}
if (shutdownDelayedTransport) {
delayedTransport.setTransportSupplier(new Supplier<ClientTransport>() {
@Override
@ -425,6 +437,7 @@ final class TransportSet implements WithLogId {
new Object[] {getLogId(), transport.getLogId(), address, s});
}
super.transportShutdown(s);
Runnable runnable = null;
synchronized (lock) {
if (activeTransport == transport) {
// This is true only if the transport was ready.
@ -440,10 +453,13 @@ final class TransportSet implements WithLogId {
scheduleBackoff(delayedTransport, s);
} else {
// Still CONNECTING
startNewTransport(delayedTransport);
runnable = startNewTransport(delayedTransport);
}
}
}
if (runnable != null) {
runnable.run();
}
loadBalancer.handleTransportShutdown(addressGroup, s);
if (allAddressesFailed) {
callback.onAllAddressesFailed();

View File

@ -133,7 +133,7 @@ class NettyClientTransport implements ConnectionClientTransport {
}
@Override
public void start(Listener transportListener) {
public Runnable start(Listener transportListener) {
lifecycleManager = new ClientTransportLifecycleManager(
Preconditions.checkNotNull(transportListener, "listener"));
@ -191,6 +191,7 @@ class NettyClientTransport implements ConnectionClientTransport {
Status.INTERNAL.withDescription("Connection closed with unknown cause"));
}
});
return null;
}
@Override

View File

@ -344,7 +344,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
}
@Override
public void start(Listener listener) {
public Runnable start(Listener listener) {
this.listener = Preconditions.checkNotNull(listener, "listener");
if (enableKeepAlive) {
@ -433,6 +433,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
}
}
});
return null;
}
@Override

View File

@ -182,7 +182,7 @@ public abstract class AbstractTransportTest {
public void frameAfterRstStreamShouldNotBreakClientChannel() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -232,7 +232,7 @@ public abstract class AbstractTransportTest {
server = null;
InOrder inOrder = inOrder(mockClientTransportListener);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
inOrder.verify(mockClientTransportListener).transportShutdown(statusCaptor.capture());
assertCodeEquals(Status.UNAVAILABLE, statusCaptor.getValue());
@ -246,7 +246,7 @@ public abstract class AbstractTransportTest {
server.start(serverListener);
client = newClientTransport(server);
InOrder inOrder = inOrder(mockClientTransportListener);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
client.shutdown();
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
inOrder.verify(mockClientTransportListener).transportShutdown(statusCaptor.capture());
@ -260,7 +260,7 @@ public abstract class AbstractTransportTest {
server.start(serverListener);
client = newClientTransport(server);
InOrder inOrder = inOrder(mockClientTransportListener);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
@ -288,7 +288,7 @@ public abstract class AbstractTransportTest {
public void openStreamPreventsTermination() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -339,7 +339,7 @@ public abstract class AbstractTransportTest {
public void shutdownNowKillsClientStream() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -370,7 +370,7 @@ public abstract class AbstractTransportTest {
public void shutdownNowKillsServerStream() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -403,7 +403,7 @@ public abstract class AbstractTransportTest {
public void ping() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class);
try {
client.ping(mockPingCallback, MoreExecutors.directExecutor());
@ -419,7 +419,7 @@ public abstract class AbstractTransportTest {
public void ping_duringShutdown() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
// Stream prevents termination
ClientStream stream = client.newStream(methodDescriptor, new Metadata());
stream.start(mockClientStreamListener);
@ -440,7 +440,7 @@ public abstract class AbstractTransportTest {
public void ping_afterTermination() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
client.shutdown();
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
@ -466,7 +466,7 @@ public abstract class AbstractTransportTest {
public void newStream_duringShutdown() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
// Stream prevents termination
ClientStream stream = client.newStream(methodDescriptor, new Metadata());
stream.start(mockClientStreamListener);
@ -497,7 +497,7 @@ public abstract class AbstractTransportTest {
// dealing with afterTermination is harder than duringShutdown.
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
client.shutdown();
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
@ -514,7 +514,7 @@ public abstract class AbstractTransportTest {
public void transportInUse_normalClose() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
ClientStream stream1 = client.newStream(methodDescriptor, new Metadata());
stream1.start(mockClientStreamListener);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true);
@ -542,7 +542,7 @@ public abstract class AbstractTransportTest {
public void transportInUse_clientCancel() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
ClientStream stream1 = client.newStream(methodDescriptor, new Metadata());
stream1.start(mockClientStreamListener);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true);
@ -562,7 +562,7 @@ public abstract class AbstractTransportTest {
public void basicStream() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -640,7 +640,7 @@ public abstract class AbstractTransportTest {
public void zeroMessageStream() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -672,7 +672,7 @@ public abstract class AbstractTransportTest {
public void earlyServerClose_withServerHeaders() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -702,7 +702,7 @@ public abstract class AbstractTransportTest {
public void earlyServerClose_noServerHeaders() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -739,7 +739,7 @@ public abstract class AbstractTransportTest {
public void earlyServerClose_serverFailure() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -767,7 +767,7 @@ public abstract class AbstractTransportTest {
public void clientCancel() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -798,7 +798,7 @@ public abstract class AbstractTransportTest {
public void clientCancelFromWithinMessageRead() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -852,7 +852,7 @@ public abstract class AbstractTransportTest {
public void serverCancel() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -889,7 +889,7 @@ public abstract class AbstractTransportTest {
public void flowControlPushBack() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
client.start(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -1052,7 +1052,7 @@ public abstract class AbstractTransportTest {
*/
private void doPingPong(MockServerListener serverListener) throws InterruptedException {
ManagedClientTransport client = newClientTransport(server);
client.start(mock(ManagedClientTransport.Listener.class));
runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata());
ClientStreamListener mockClientStreamListener = mock(ClientStreamListener.class);
clientStream.start(mockClientStreamListener);
@ -1100,6 +1100,12 @@ public abstract class AbstractTransportTest {
return true;
}
private static void runIfNotNull(Runnable runnable) {
if (runnable != null) {
runnable.run();
}
}
private static class MockServerListener implements ServerListener {
public final BlockingQueue<MockServerTransportListener> listeners
= new LinkedBlockingQueue<MockServerTransportListener>();