core: two changes on DelayedClientTransport2. (#2505)

1. Use ChannelExecutor to run callbacks.  Now callbacks are no longer
run under the delayed transport lock.

2. Use explicit picker version instead of relying on identity equality
to detect new pickers.  Rationale: if reprocess() is called again with
the same picker, all pending streams will be reprocessed by this picker
again, thus there is no reason to leave out the racing new stream.
This commit is contained in:
Kun Zhang 2016-12-15 08:47:33 -08:00 committed by GitHub
parent aada0780b8
commit 946046abaf
2 changed files with 203 additions and 50 deletions

View File

@ -32,7 +32,6 @@
package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.CallOptions;
import io.grpc.Context;
@ -60,14 +59,17 @@ import javax.annotation.concurrent.GuardedBy;
* thus the delayed transport stops owning the stream.
*/
final class DelayedClientTransport2 implements ManagedClientTransport {
private final LogId lodId = LogId.allocate(getClass().getName());
private final Object lock = new Object();
private final Executor streamCreationExecutor;
private final Executor defaultAppExecutor;
private final ChannelExecutor channelExecutor;
private Listener listener;
private Runnable reportTransportInUse;
private Runnable reportTransportNotInUse;
private Runnable reportTransportShutdown;
private Runnable reportTransportTerminated;
@GuardedBy("lock")
private Collection<PendingStream> pendingStreams = new LinkedHashSet<PendingStream>();
@ -85,13 +87,50 @@ final class DelayedClientTransport2 implements ManagedClientTransport {
@Nullable
private SubchannelPicker lastPicker;
DelayedClientTransport2(Executor streamCreationExecutor) {
this.streamCreationExecutor = streamCreationExecutor;
@GuardedBy("lock")
private long lastPickerVersion;
/**
* Creates a new delayed transport.
*
* @param defaultAppExecutor pending streams will create real streams and run bufferred operations
* in an application executor, which will be this executor, unless there is on provided in
* {@link CallOptions}.
* @param channelExecutor all listener callbacks of the delayed transport will be run from this
* ChannelExecutor.
*/
DelayedClientTransport2(Executor defaultAppExecutor, ChannelExecutor channelExecutor) {
this.defaultAppExecutor = defaultAppExecutor;
this.channelExecutor = channelExecutor;
}
@Override
public final Runnable start(Listener listener) {
this.listener = Preconditions.checkNotNull(listener, "listener");
public final Runnable start(final Listener listener) {
reportTransportInUse = new Runnable() {
@Override
public void run() {
listener.transportInUse(true);
}
};
reportTransportNotInUse = new Runnable() {
@Override
public void run() {
listener.transportInUse(false);
}
};
reportTransportShutdown = new Runnable() {
@Override
public void run() {
listener.transportShutdown(
Status.UNAVAILABLE.withDescription("Channel requested transport to shut down"));
}
};
reportTransportTerminated = new Runnable() {
@Override
public void run() {
listener.transportTerminated();
}
};
return null;
}
@ -105,39 +144,46 @@ final class DelayedClientTransport2 implements ManagedClientTransport {
@Override
public final ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers,
CallOptions callOptions, StatsTraceContext statsTraceCtx) {
SubchannelPicker picker = null;
synchronized (lock) {
if (!shutdown) {
if (lastPicker == null) {
return createPendingStream(method, headers, callOptions, statsTraceCtx);
}
picker = lastPicker;
}
}
if (picker != null) {
while (true) {
PickResult pickResult = picker.pickSubchannel(callOptions.getAffinity(), headers);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(
pickResult, callOptions.isWaitForReady());
if (transport != null) {
return transport.newStream(method, headers, callOptions, statsTraceCtx);
}
// This picker's conclusion is "buffer". If there hasn't been a newer picker set
// (possible race with reprocess()), we will buffer it. Otherwise, will try with the new
// picker.
synchronized (lock) {
if (shutdown) {
break;
}
if (picker == lastPicker) {
try {
SubchannelPicker picker = null;
long pickerVersion = -1;
synchronized (lock) {
if (!shutdown) {
if (lastPicker == null) {
return createPendingStream(method, headers, callOptions, statsTraceCtx);
}
picker = lastPicker;
pickerVersion = lastPickerVersion;
}
}
if (picker != null) {
while (true) {
PickResult pickResult = picker.pickSubchannel(callOptions.getAffinity(), headers);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(
pickResult, callOptions.isWaitForReady());
if (transport != null) {
return transport.newStream(method, headers, callOptions, statsTraceCtx);
}
// This picker's conclusion is "buffer". If there hasn't been a newer picker set
// (possible race with reprocess()), we will buffer it. Otherwise, will try with the new
// picker.
synchronized (lock) {
if (shutdown) {
break;
}
if (pickerVersion == lastPickerVersion) {
return createPendingStream(method, headers, callOptions, statsTraceCtx);
}
picker = lastPicker;
pickerVersion = lastPickerVersion;
}
}
}
return new FailingClientStream(Status.UNAVAILABLE.withDescription(
"Channel has shutdown (reported by delayed transport)"));
} finally {
channelExecutor.drain();
}
return new FailingClientStream(Status.UNAVAILABLE.withDescription(
"Channel has shutdown (reported by delayed transport)"));
}
@Override
@ -145,6 +191,8 @@ final class DelayedClientTransport2 implements ManagedClientTransport {
return newStream(method, headers, CallOptions.DEFAULT, StatsTraceContext.NOOP);
}
// Caller must call channelExecutor.drain() outside of lock because this method may schedule
// tasks on channelExecutor
@GuardedBy("lock")
private PendingStream createPendingStream(MethodDescriptor<?, ?> method, Metadata headers,
CallOptions callOptions, StatsTraceContext statsTraceCtx) {
@ -152,7 +200,7 @@ final class DelayedClientTransport2 implements ManagedClientTransport {
statsTraceCtx);
pendingStreams.add(pendingStream);
if (pendingStreams.size() == 1) {
listener.transportInUse(true);
channelExecutor.executeLater(reportTransportInUse);
}
return pendingStream;
}
@ -174,13 +222,13 @@ final class DelayedClientTransport2 implements ManagedClientTransport {
return;
}
shutdown = true;
listener.transportShutdown(
Status.UNAVAILABLE.withDescription("Channel requested transport to shut down"));
channelExecutor.executeLater(reportTransportShutdown);
if (pendingStreams == null || pendingStreams.isEmpty()) {
pendingStreams = null;
listener.transportTerminated();
channelExecutor.executeLater(reportTransportTerminated);
}
}
channelExecutor.drain();
}
/**
@ -201,7 +249,7 @@ final class DelayedClientTransport2 implements ManagedClientTransport {
for (PendingStream stream : savedPendingStreams) {
stream.cancel(status);
}
listener.transportTerminated();
channelExecutor.executeLater(reportTransportTerminated).drain();
}
// If savedPendingStreams == null, transportTerminated() has already been called in shutdown().
}
@ -224,18 +272,18 @@ final class DelayedClientTransport2 implements ManagedClientTransport {
* pick is successful, otherwise keep it pending.
*
* <p>This method may be called concurrently with {@code newStream()}, and it's safe. All pending
* streams will be served by the latest picker as soon as possible.
* streams will be served by the latest picker (if a same picker is given more than once, they are
* considered different pickers) as soon as possible.
*
* <p>This method <strong>must not</strong> be called concurrently, with itself or with {@link
* #setTransportSupplier}/{@link #setTransport}.
*
* @return the version number of the given picker.
*/
final void reprocess(SubchannelPicker picker) {
ArrayList<PendingStream> toProcess;
ArrayList<PendingStream> toRemove = new ArrayList<PendingStream>();
synchronized (lock) {
lastPicker = picker;
lastPickerVersion++;
if (pendingStreams == null || pendingStreams.isEmpty()) {
return;
}
@ -248,7 +296,7 @@ final class DelayedClientTransport2 implements ManagedClientTransport {
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(
pickResult, stream.callOptions.isWaitForReady());
if (transport != null) {
Executor executor = streamCreationExecutor;
Executor executor = defaultAppExecutor;
// createRealStream may be expensive. It will start real streams on the transport. If
// there are pending requests, they will be serialized too, which may be expensive. Since
// we are now on transport thread, we need to offload the work to an executor.
@ -279,10 +327,10 @@ final class DelayedClientTransport2 implements ManagedClientTransport {
// in-use state may be false. However, it shouldn't cause spurious switching to idleness
// (which would shutdown the transports and LoadBalancer) because the gap should be shorter
// than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
listener.transportInUse(false);
channelExecutor.executeLater(reportTransportNotInUse);
if (shutdown) {
pendingStreams = null;
listener.transportTerminated();
channelExecutor.executeLater(reportTransportTerminated);
} else {
// Because delayed transport is long-lived, we take this opportunity to down-size the
// hashmap.
@ -290,6 +338,7 @@ final class DelayedClientTransport2 implements ManagedClientTransport {
}
}
}
channelExecutor.drain();
}
// TODO(carl-mastrangelo): remove this once the Subchannel change is in.
@ -332,14 +381,15 @@ final class DelayedClientTransport2 implements ManagedClientTransport {
if (pendingStreams != null) {
boolean justRemovedAnElement = pendingStreams.remove(this);
if (pendingStreams.isEmpty() && justRemovedAnElement) {
listener.transportInUse(false);
channelExecutor.executeLater(reportTransportNotInUse);
if (shutdown) {
pendingStreams = null;
listener.transportTerminated();
channelExecutor.executeLater(reportTransportTerminated);
}
}
}
}
channelExecutor.drain();
}
}
}

View File

@ -37,10 +37,12 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.same;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@ -66,8 +68,13 @@ import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Unit tests for {@link DelayedClientTransport2}.
@ -98,6 +105,7 @@ public class DelayedClientTransport2Test {
private final Metadata headers = new Metadata();
private final Metadata headers2 = new Metadata();
private final Metadata headers3 = new Metadata();
private final CallOptions callOptions = CallOptions.DEFAULT.withAuthority("dummy_value");
private final CallOptions callOptions2 = CallOptions.DEFAULT.withAuthority("dummy_value2");
@ -110,8 +118,8 @@ public class DelayedClientTransport2Test {
private final FakeClock fakeExecutor = new FakeClock();
private final DelayedClientTransport2 delayedTransport =
new DelayedClientTransport2(fakeExecutor.getScheduledExecutorService());
private final DelayedClientTransport2 delayedTransport = new DelayedClientTransport2(
fakeExecutor.getScheduledExecutorService(), new ChannelExecutor());
@Before public void setUp() {
MockitoAnnotations.initMocks(this);
@ -462,4 +470,99 @@ public class DelayedClientTransport2Test {
verify(subchannel).obtainActiveTransport();
assertSame(mockRealStream, stream);
}
@Test
public void reprocess_newStreamRacesWithReprocess() throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
// In both phases, we only expect the first pickSubchannel() call to block on the barrier.
final AtomicBoolean nextPickShouldWait = new AtomicBoolean(true);
///////// Phase 1: reprocess() twice with the same picker
SubchannelPicker picker = mock(SubchannelPicker.class);
doAnswer(new Answer<PickResult>() {
@Override
public PickResult answer(InvocationOnMock invocation) throws Throwable {
if (nextPickShouldWait.compareAndSet(true, false)) {
try {
barrier.await();
return PickResult.withNoResult();
} catch (Exception e) {
e.printStackTrace();
}
}
return PickResult.withNoResult();
}
}).when(picker).pickSubchannel(any(Attributes.class), any(Metadata.class));
// Because there is no pending stream yet, it will do nothing but save the picker.
delayedTransport.reprocess(picker);
verify(picker, never()).pickSubchannel(any(Attributes.class), any(Metadata.class));
Thread sideThread = new Thread("sideThread") {
@Override
public void run() {
// Will call pickSubchannel and wait on barrier
delayedTransport.newStream(method, headers, callOptions, statsTraceCtx);
}
};
sideThread.start();
// Is called from sideThread
verify(picker, timeout(5000)).pickSubchannel(callOptions.getAffinity(), headers);
// Because stream has not been buffered (it's still stuck in newStream()), this will do nothing,
// but incrementing the picker version.
delayedTransport.reprocess(picker);
verify(picker).pickSubchannel(callOptions.getAffinity(), headers);
// Now let the stuck newStream() through
barrier.await(5, TimeUnit.SECONDS);
sideThread.join(5000);
assertFalse("sideThread should've exited", sideThread.isAlive());
// newStream() detects that there has been a new picker while it's stuck, thus will pick again.
verify(picker, times(2)).pickSubchannel(callOptions.getAffinity(), headers);
barrier.reset();
nextPickShouldWait.set(true);
////////// Phase 2: reprocess() with a different picker
// Create the second stream
Thread sideThread2 = new Thread("sideThread2") {
@Override
public void run() {
// Will call pickSubchannel and wait on barrier
delayedTransport.newStream(method, headers2, callOptions, statsTraceCtx);
}
};
sideThread2.start();
// The second stream will see the first picker
verify(picker, timeout(5000)).pickSubchannel(callOptions.getAffinity(), headers2);
// While the first stream won't use the first picker any more.
verify(picker, times(2)).pickSubchannel(callOptions.getAffinity(), headers);
// Now use a different picker
SubchannelPicker picker2 = mock(SubchannelPicker.class);
when(picker2.pickSubchannel(any(Attributes.class), any(Metadata.class)))
.thenReturn(PickResult.withNoResult());
delayedTransport.reprocess(picker2);
// The pending first stream uses the new picker
verify(picker2).pickSubchannel(callOptions.getAffinity(), headers);
// The second stream is still pending in creation, doesn't use the new picker.
verify(picker2, never()).pickSubchannel(callOptions.getAffinity(), headers2);
// Now let the second stream finish creation
barrier.await(5, TimeUnit.SECONDS);
sideThread2.join(5000);
assertFalse("sideThread2 should've exited", sideThread2.isAlive());
// The second stream should see the new picker
verify(picker2, timeout(5000)).pickSubchannel(callOptions.getAffinity(), headers2);
// Wrapping up
verify(picker, times(2)).pickSubchannel(callOptions.getAffinity(), headers);
verify(picker).pickSubchannel(callOptions.getAffinity(), headers2);
verify(picker2).pickSubchannel(callOptions.getAffinity(), headers);
verify(picker2).pickSubchannel(callOptions.getAffinity(), headers2);
}
}