core: pass status to ManagedClientTransport.shutdown() (#3351)

This aligns with shutdownNow(), which is already accepting a status.
The status will be propagated to application when RPCs failed because
of transport shutdown, which will become useful information for debug.
This commit is contained in:
Kun Zhang 2017-08-16 10:23:07 -07:00 committed by GitHub
parent ca7685ef50
commit 41410345e6
16 changed files with 153 additions and 135 deletions

View File

@ -156,24 +156,29 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
}
@Override
public synchronized void shutdown() {
public synchronized void shutdown(Status reason) {
// Can be called multiple times: once for ManagedClientTransport, once for ServerTransport.
if (shutdown) {
return;
}
shutdownStatus = Status.UNAVAILABLE.withDescription("transport was requested to shut down");
notifyShutdown(shutdownStatus);
shutdownStatus = reason;
notifyShutdown(reason);
if (streams.isEmpty()) {
notifyTerminated();
}
}
@Override
public synchronized void shutdown() {
shutdown(Status.UNAVAILABLE.withDescription("InProcessTransport shutdown by the server-side"));
}
@Override
public void shutdownNow(Status reason) {
checkNotNull(reason, "reason");
List<InProcessStream> streamsCopy;
synchronized (this) {
shutdown();
shutdown(reason);
if (terminated) {
return;
}

View File

@ -51,17 +51,18 @@ final class DelayedClientTransport implements ManagedClientTransport {
private Runnable reportTransportInUse;
private Runnable reportTransportNotInUse;
private Runnable reportTransportShutdown;
private Runnable reportTransportTerminated;
private Listener listener;
@GuardedBy("lock")
private Collection<PendingStream> pendingStreams = new LinkedHashSet<PendingStream>();
/**
* When shutdown == true and pendingStreams == null, then the transport is considered terminated.
* When shutdownStatus != null and pendingStreams == null, then the transport is considered
* terminated.
*/
@GuardedBy("lock")
private boolean shutdown;
private Status shutdownStatus;
/**
* The last picker that {@link #reprocess} has used.
@ -89,6 +90,7 @@ final class DelayedClientTransport implements ManagedClientTransport {
@Override
public final Runnable start(final Listener listener) {
this.listener = listener;
reportTransportInUse = new Runnable() {
@Override
public void run() {
@ -101,13 +103,6 @@ final class DelayedClientTransport implements ManagedClientTransport {
listener.transportInUse(false);
}
};
reportTransportShutdown = new Runnable() {
@Override
public void run() {
listener.transportShutdown(
Status.UNAVAILABLE.withDescription("Channel requested transport to shut down"));
}
};
reportTransportTerminated = new Runnable() {
@Override
public void run() {
@ -128,44 +123,41 @@ final class DelayedClientTransport implements ManagedClientTransport {
public final ClientStream newStream(
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
try {
SubchannelPicker picker = null;
SubchannelPicker picker;
PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
long pickerVersion = -1;
synchronized (lock) {
if (!shutdown) {
if (shutdownStatus == null) {
if (lastPicker == null) {
return createPendingStream(args);
}
picker = lastPicker;
pickerVersion = lastPickerVersion;
} else {
return new FailingClientStream(shutdownStatus);
}
}
while (true) {
PickResult pickResult = picker.pickSubchannel(args);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
return transport.newStream(
args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions());
}
// This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible
// race with reprocess()), we will buffer it. Otherwise, will try with the new picker.
synchronized (lock) {
if (shutdownStatus != null) {
return new FailingClientStream(shutdownStatus);
}
if (pickerVersion == lastPickerVersion) {
return createPendingStream(args);
}
picker = lastPicker;
pickerVersion = lastPickerVersion;
}
}
if (picker != null) {
while (true) {
PickResult pickResult = picker.pickSubchannel(args);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
return transport.newStream(
args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions());
}
// This picker's conclusion is "buffer". If there hasn't been a newer picker set
// (possible race with reprocess()), we will buffer it. Otherwise, will try with the new
// picker.
synchronized (lock) {
if (shutdown) {
break;
}
if (pickerVersion == lastPickerVersion) {
return createPendingStream(args);
}
picker = lastPicker;
pickerVersion = lastPickerVersion;
}
}
}
return new FailingClientStream(Status.UNAVAILABLE.withDescription(
"Channel has shutdown (reported by delayed transport)"));
} finally {
channelExecutor.drain();
}
@ -196,13 +188,18 @@ final class DelayedClientTransport implements ManagedClientTransport {
* more buffered streams.
*/
@Override
public final void shutdown() {
public final void shutdown(final Status status) {
synchronized (lock) {
if (shutdown) {
if (shutdownStatus != null) {
return;
}
shutdown = true;
channelExecutor.executeLater(reportTransportShutdown);
shutdownStatus = status;
channelExecutor.executeLater(new Runnable() {
@Override
public void run() {
listener.transportShutdown(status);
}
});
if (pendingStreams == null || pendingStreams.isEmpty()) {
pendingStreams = null;
channelExecutor.executeLater(reportTransportTerminated);
@ -217,7 +214,7 @@ final class DelayedClientTransport implements ManagedClientTransport {
*/
@Override
public final void shutdownNow(Status status) {
shutdown();
shutdown(status);
Collection<PendingStream> savedPendingStreams = null;
synchronized (lock) {
if (pendingStreams != null) {
@ -307,7 +304,7 @@ final class DelayedClientTransport implements ManagedClientTransport {
// (which would shutdown the transports and LoadBalancer) because the gap should be shorter
// than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
channelExecutor.executeLater(reportTransportNotInUse);
if (shutdown) {
if (shutdownStatus != null) {
pendingStreams = null;
channelExecutor.executeLater(reportTransportTerminated);
} else {
@ -354,7 +351,7 @@ final class DelayedClientTransport implements ManagedClientTransport {
boolean justRemovedAnElement = pendingStreams.remove(this);
if (pendingStreams.isEmpty() && justRemovedAnElement) {
channelExecutor.executeLater(reportTransportNotInUse);
if (shutdown) {
if (shutdownStatus != null) {
pendingStreams = null;
channelExecutor.executeLater(reportTransportTerminated);
}

View File

@ -30,8 +30,8 @@ abstract class ForwardingConnectionClientTransport implements ConnectionClientTr
}
@Override
public void shutdown() {
delegate().shutdown();
public void shutdown(Status status) {
delegate().shutdown(status);
}
@Override

View File

@ -137,6 +137,9 @@ final class InternalSubchannel implements WithLogId {
@GuardedBy("lock")
private ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE);
@GuardedBy("lock")
private Status shutdownReason;
InternalSubchannel(EquivalentAddressGroup addressGroup, String authority, String userAgent,
BackoffPolicy.Provider backoffPolicyProvider,
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
@ -302,11 +305,13 @@ final class InternalSubchannel implements WithLogId {
channelExecutor.drain();
}
if (savedTransport != null) {
savedTransport.shutdown();
savedTransport.shutdown(
Status.UNAVAILABLE.withDescription(
"InternalSubchannel closed transport due to address change"));
}
}
public void shutdown() {
public void shutdown(Status reason) {
ManagedClientTransport savedActiveTransport;
ConnectionClientTransport savedPendingTransport;
try {
@ -314,6 +319,7 @@ final class InternalSubchannel implements WithLogId {
if (state.getState() == SHUTDOWN) {
return;
}
shutdownReason = reason;
gotoNonErrorState(SHUTDOWN);
savedActiveTransport = activeTransport;
savedPendingTransport = pendingTransport;
@ -332,10 +338,10 @@ final class InternalSubchannel implements WithLogId {
channelExecutor.drain();
}
if (savedActiveTransport != null) {
savedActiveTransport.shutdown();
savedActiveTransport.shutdown(reason);
}
if (savedPendingTransport != null) {
savedPendingTransport.shutdown();
savedPendingTransport.shutdown(reason);
}
}
@ -360,7 +366,7 @@ final class InternalSubchannel implements WithLogId {
}
void shutdownNow(Status reason) {
shutdown();
shutdown(reason);
Collection<ManagedClientTransport> transportsCopy;
try {
synchronized (lock) {
@ -424,12 +430,12 @@ final class InternalSubchannel implements WithLogId {
log.log(Level.FINE, "[{0}] {1} for {2} is ready",
new Object[] {logId, transport.getLogId(), address});
}
ConnectivityState savedState;
Status savedShutdownReason;
try {
synchronized (lock) {
savedState = state.getState();
savedShutdownReason = shutdownReason;
reconnectPolicy = null;
if (savedState == SHUTDOWN) {
if (savedShutdownReason != null) {
// activeTransport should have already been set to null by shutdown(). We keep it null.
Preconditions.checkState(activeTransport == null,
"Unexpected non-null activeTransport");
@ -442,8 +448,8 @@ final class InternalSubchannel implements WithLogId {
} finally {
channelExecutor.drain();
}
if (savedState == SHUTDOWN) {
transport.shutdown();
if (savedShutdownReason != null) {
transport.shutdown(savedShutdownReason);
}
}

View File

@ -84,6 +84,14 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
static final Status SHUTDOWN_NOW_STATUS =
Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
@VisibleForTesting
static final Status SHUTDOWN_STATUS =
Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
@VisibleForTesting
static final Status SUBCHANNEL_SHUTDOWN_STATUS =
Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
private final String target;
private final NameResolver.Factory nameResolverFactory;
private final Attributes nameResolverParams;
@ -470,7 +478,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
}
});
delayedTransport.shutdown();
delayedTransport.shutdown(SHUTDOWN_STATUS);
channelExecutor.executeLater(new Runnable() {
@Override
public void run() {
@ -659,7 +667,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
// shutdown even if "terminating" is already true. The subchannel will not be used in
// this case, because delayed transport has terminated when "terminating" becomes
// true, and no more requests will be sent to balancer beyond this point.
internalSubchannel.shutdown();
internalSubchannel.shutdown(SHUTDOWN_STATUS);
}
if (!terminated) {
// If channel has not terminated, it will track the subchannel and block termination
@ -904,7 +912,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
new Runnable() {
@Override
public void run() {
subchannel.shutdown();
subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
}
}), SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
return;
@ -912,7 +920,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
}
// When terminating == true, no more real streams will be created. It's safe and also
// desirable to shutdown timely.
subchannel.shutdown();
subchannel.shutdown(SHUTDOWN_STATUS);
}
@Override

View File

@ -56,7 +56,7 @@ public interface ManagedClientTransport extends ClientTransport, WithLogId {
* {@link Listener#transportShutdown} callback called), or be transferred off this transport (in
* which case they may succeed). This method may only be called once.
*/
void shutdown();
void shutdown(Status reason);
/**
* Initiates a forceful shutdown in which preexisting and new calls are closed. Existing calls

View File

@ -109,7 +109,7 @@ final class OobChannel extends ManagedChannel implements WithLogId {
subchannelImpl = new AbstractSubchannel() {
@Override
public void shutdown() {
subchannel.shutdown();
subchannel.shutdown(Status.UNAVAILABLE.withDescription("OobChannel is shutdown"));
}
@Override
@ -179,7 +179,7 @@ final class OobChannel extends ManagedChannel implements WithLogId {
@Override
public ManagedChannel shutdown() {
shutdown = true;
delayedTransport.shutdown();
delayedTransport.shutdown(Status.UNAVAILABLE.withDescription("OobChannel.shutdown() called"));
return this;
}

View File

@ -78,6 +78,8 @@ public class DelayedClientTransportTest {
@Captor private ArgumentCaptor<ClientStreamListener> listenerCaptor;
private static final CallOptions.Key<Integer> SHARD_ID = CallOptions.Key.of("shard-id", -1);
private static final Status SHUTDOWN_STATUS =
Status.UNAVAILABLE.withDescription("shutdown called");
private final MethodDescriptor<String, Integer> method =
MethodDescriptor.<String, Integer>newBuilder()
@ -141,8 +143,8 @@ public class DelayedClientTransportTest {
assertTrue(stream instanceof DelayedStream);
delayedTransport.reprocess(mockPicker);
assertEquals(0, delayedTransport.getPendingStreamsCount());
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
delayedTransport.shutdown(SHUTDOWN_STATUS);
verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS));
verify(transportListener).transportTerminated();
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions));
@ -151,8 +153,8 @@ public class DelayedClientTransportTest {
}
@Test public void transportTerminatedThenAssignTransport() {
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
delayedTransport.shutdown(SHUTDOWN_STATUS);
verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS));
verify(transportListener).transportTerminated();
delayedTransport.reprocess(mockPicker);
verifyNoMoreInteractions(transportListener);
@ -160,8 +162,8 @@ public class DelayedClientTransportTest {
@Test public void assignTransportThenShutdownThenNewStream() {
delayedTransport.reprocess(mockPicker);
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
delayedTransport.shutdown(SHUTDOWN_STATUS);
verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS));
verify(transportListener).transportTerminated();
ClientStream stream = delayedTransport.newStream(method, headers, callOptions);
assertEquals(0, delayedTransport.getPendingStreamsCount());
@ -205,10 +207,10 @@ public class DelayedClientTransportTest {
@Test public void newStreamThenShutdownTransportThenAssignTransport() {
ClientStream stream = delayedTransport.newStream(method, headers, callOptions);
stream.start(streamListener);
delayedTransport.shutdown();
delayedTransport.shutdown(SHUTDOWN_STATUS);
// Stream is still buffered
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS));
verify(transportListener, times(0)).transportTerminated();
assertEquals(1, delayedTransport.getPendingStreamsCount());
@ -236,8 +238,8 @@ public class DelayedClientTransportTest {
@Test public void newStreamThenShutdownTransportThenCancelStream() {
ClientStream stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
delayedTransport.shutdown(SHUTDOWN_STATUS);
verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS));
verify(transportListener, times(0)).transportTerminated();
assertEquals(1, delayedTransport.getPendingStreamsCount());
stream.cancel(Status.CANCELLED);
@ -248,8 +250,8 @@ public class DelayedClientTransportTest {
}
@Test public void shutdownThenNewStream() {
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
delayedTransport.shutdown(SHUTDOWN_STATUS);
verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS));
verify(transportListener).transportTerminated();
ClientStream stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(streamListener);
@ -426,8 +428,8 @@ public class DelayedClientTransportTest {
assertEquals(1, delayedTransport.getPendingStreamsCount());
// wfr5 will stop delayed transport from terminating
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
delayedTransport.shutdown(SHUTDOWN_STATUS);
verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS));
verify(transportListener, never()).transportTerminated();
// ... until it's gone
picker = mock(SubchannelPicker.class);

View File

@ -65,6 +65,7 @@ public class InternalSubchannelTest {
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE);
private static final ConnectivityStateInfo RESOURCE_EXHAUSTED_STATE =
ConnectivityStateInfo.forTransientFailure(Status.RESOURCE_EXHAUSTED);
private static final Status SHUTDOWN_REASON = Status.UNAVAILABLE.withDescription("for test");
// For scheduled executor
private final FakeClock fakeClock = new FakeClock();
@ -392,7 +393,7 @@ public class InternalSubchannelTest {
internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr2, addr3)));
assertNoCallbackInvoke();
assertEquals(READY, internalSubchannel.getState());
verify(transports.peek().transport, never()).shutdown();
verify(transports.peek().transport, never()).shutdown(any(Status.class));
verify(transports.peek().transport, never()).shutdownNow(any(Status.class));
// And new addresses chosen when re-connecting
@ -433,7 +434,7 @@ public class InternalSubchannelTest {
internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr2, addr3)));
assertNoCallbackInvoke();
assertEquals(CONNECTING, internalSubchannel.getState());
verify(transports.peek().transport, never()).shutdown();
verify(transports.peek().transport, never()).shutdown(any(Status.class));
verify(transports.peek().transport, never()).shutdownNow(any(Status.class));
// And new addresses chosen when re-connecting
@ -507,7 +508,7 @@ public class InternalSubchannelTest {
internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr3, addr4)));
assertExactCallbackInvokes("onStateChange:IDLE");
assertEquals(IDLE, internalSubchannel.getState());
verify(transports.peek().transport).shutdown();
verify(transports.peek().transport).shutdown(any(Status.class));
// And new addresses chosen when re-connecting
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
@ -551,7 +552,7 @@ public class InternalSubchannelTest {
assertEquals(CONNECTING, internalSubchannel.getState());
// And new addresses chosen immediately
verify(transports.poll().transport).shutdown();
verify(transports.poll().transport).shutdown(any(Status.class));
assertNoCallbackInvoke();
assertEquals(CONNECTING, internalSubchannel.getState());
@ -622,8 +623,8 @@ public class InternalSubchannelTest {
transportInfo.listener.transportReady();
assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY");
internalSubchannel.shutdown();
verify(transportInfo.transport).shutdown();
internalSubchannel.shutdown(SHUTDOWN_REASON);
verify(transportInfo.transport).shutdown(same(SHUTDOWN_REASON));
assertExactCallbackInvokes("onStateChange:SHUTDOWN");
transportInfo.listener.transportTerminated();
@ -661,7 +662,7 @@ public class InternalSubchannelTest {
assertNotNull("There should be at least one reconnectTask", reconnectTask);
// Shut down InternalSubchannel before the transport is created.
internalSubchannel.shutdown();
internalSubchannel.shutdown(SHUTDOWN_REASON);
assertTrue(reconnectTask.isCancelled());
// InternalSubchannel terminated promptly.
assertExactCallbackInvokes("onStateChange:SHUTDOWN", "onTerminated");
@ -694,11 +695,11 @@ public class InternalSubchannelTest {
// Shutdown the InternalSubchannel before the pending transport is ready
assertNull(internalSubchannel.obtainActiveTransport());
internalSubchannel.shutdown();
internalSubchannel.shutdown(SHUTDOWN_REASON);
assertExactCallbackInvokes("onStateChange:SHUTDOWN");
// The transport should've been shut down even though it's not the active transport yet.
verify(transportInfo.transport).shutdown();
verify(transportInfo.transport).shutdown(same(SHUTDOWN_REASON));
transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
assertNoCallbackInvoke();
transportInfo.listener.transportTerminated();
@ -735,7 +736,7 @@ public class InternalSubchannelTest {
SocketAddress addr = mock(SocketAddress.class);
createInternalSubchannel(addr);
internalSubchannel.shutdown();
internalSubchannel.shutdown(SHUTDOWN_REASON);
assertExactCallbackInvokes("onStateChange:SHUTDOWN", "onTerminated");
assertEquals(SHUTDOWN, internalSubchannel.getState());
assertNull(internalSubchannel.obtainActiveTransport());

View File

@ -399,7 +399,11 @@ public class ManagedChannelImplTest {
}
// LoadBalancer should shutdown the subchannel
subchannel.shutdown();
verify(mockTransport).shutdown();
if (shutdownNow) {
verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS));
} else {
verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS));
}
// Killing the remaining real transport will terminate the channel
transportListener.transportShutdown(Status.UNAVAILABLE);
@ -417,10 +421,6 @@ public class ManagedChannelImplTest {
verifyNoMoreInteractions(mockTransport);
}
@Test
public void shutdownNowWithMultipleOobChannels() {
}
@Test
public void interceptor() throws Exception {
final AtomicLong atomic = new AtomicLong();
@ -745,18 +745,18 @@ public class ManagedChannelImplTest {
sub1.shutdown();
timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS - 1, TimeUnit.SECONDS);
sub1.shutdown();
verify(transportInfo1.transport, never()).shutdown();
verify(transportInfo1.transport, never()).shutdown(any(Status.class));
timer.forwardTime(1, TimeUnit.SECONDS);
verify(transportInfo1.transport).shutdown();
verify(transportInfo1.transport).shutdown(same(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_STATUS));
// ... but not after Channel is terminating
verify(mockLoadBalancer, never()).shutdown();
channel.shutdown();
verify(mockLoadBalancer).shutdown();
verify(transportInfo2.transport, never()).shutdown();
verify(transportInfo2.transport, never()).shutdown(any(Status.class));
sub2.shutdown();
verify(transportInfo2.transport).shutdown();
verify(transportInfo2.transport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS));
}
@Test

View File

@ -540,8 +540,7 @@ class NettyClientHandler extends AbstractNettyHandler {
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
ChannelPromise promise) throws Exception {
lifecycleManager.notifyShutdown(
Status.UNAVAILABLE.withDescription("Channel requested transport to shut down"));
lifecycleManager.notifyShutdown(msg.getStatus());
close(ctx, promise);
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override

View File

@ -248,16 +248,14 @@ class NettyClientTransport implements ConnectionClientTransport {
}
@Override
public void shutdown() {
public void shutdown(Status reason) {
// start() could have failed
if (channel == null) {
return;
}
// Notifying of termination is automatically done when the channel closes.
if (channel.isOpen()) {
Status status
= Status.UNAVAILABLE.withDescription("Channel requested transport to shut down");
handler.getWriteQueue().enqueue(new GracefulCloseCommand(status), true);
handler.getWriteQueue().enqueue(new GracefulCloseCommand(reason), true);
}
}

View File

@ -118,7 +118,7 @@ public class NettyClientTransportTest {
public void teardown() throws Exception {
Context.ROOT.attach();
for (NettyClientTransport transport : transports) {
transport.shutdown();
transport.shutdown(Status.UNAVAILABLE);
}
if (server != null) {

View File

@ -587,13 +587,13 @@ class OkHttpClientTransport implements ConnectionClientTransport {
}
@Override
public void shutdown() {
public void shutdown(Status reason) {
synchronized (lock) {
if (goAwayStatus != null) {
return;
}
goAwayStatus = Status.UNAVAILABLE.withDescription("Transport stopped");
goAwayStatus = reason;
listener.transportShutdown(goAwayStatus);
stopIfNecessary();
}
@ -601,7 +601,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
@Override
public void shutdownNow(Status reason) {
shutdown();
shutdown(reason);
synchronized (lock) {
Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
while (it.hasNext()) {

View File

@ -27,6 +27,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@ -34,6 +35,7 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@ -112,6 +114,7 @@ public class OkHttpClientTransportTest {
private static final String ERROR_MESSAGE = "simulated error";
// The gRPC header length, which includes 1 byte compression flag and 4 bytes message length.
private static final int HEADER_LENGTH = 5;
private static final Status SHUTDOWN_REASON = Status.UNAVAILABLE.withDescription("for test");
@Rule
public Timeout globalTimeout = new Timeout(10 * 1000);
@ -700,10 +703,10 @@ public class OkHttpClientTransportTest {
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream2.start(listener2);
assertEquals(2, activeStreamCount());
clientTransport.shutdown();
clientTransport.shutdown(SHUTDOWN_REASON);
assertEquals(2, activeStreamCount());
verify(transportListener).transportShutdown(isA(Status.class));
verify(transportListener).transportShutdown(same(SHUTDOWN_REASON));
stream1.cancel(Status.CANCELLED);
stream2.cancel(Status.CANCELLED);
@ -912,7 +915,7 @@ public class OkHttpClientTransportTest {
stream.start(listener);
waitForStreamPending(1);
clientTransport.shutdown();
clientTransport.shutdown(SHUTDOWN_REASON);
setMaxConcurrentStreams(1);
verify(frameWriter, timeout(TIME_OUT_MS))
.synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader());
@ -1261,20 +1264,18 @@ public class OkHttpClientTransportTest {
clientTransport.ping(callback, MoreExecutors.directExecutor());
assertEquals(0, callback.invocationCount);
clientTransport.shutdown();
clientTransport.shutdown(SHUTDOWN_REASON);
// ping failed on channel shutdown
assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException);
assertEquals(Status.Code.UNAVAILABLE,
((StatusException) callback.failureCause).getStatus().getCode());
assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus());
// now that handler is in terminal state, all future pings fail immediately
callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor());
assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException);
assertEquals(Status.Code.UNAVAILABLE,
((StatusException) callback.failureCause).getStatus().getCode());
assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus());
shutdownAndVerify();
}
@ -1354,7 +1355,7 @@ public class OkHttpClientTransportTest {
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);
clientTransport.shutdown();
clientTransport.shutdown(SHUTDOWN_REASON);
allowTransportConnected();
// The new stream should be failed, but not the pending stream.
@ -1846,7 +1847,7 @@ public class OkHttpClientTransportTest {
}
private void shutdownAndVerify() {
clientTransport.shutdown();
clientTransport.shutdown(SHUTDOWN_REASON);
assertEquals(0, activeStreamCount());
try {
verify(frameWriter, timeout(TIME_OUT_MS)).close();

View File

@ -302,10 +302,10 @@ public abstract class AbstractTransportTest {
client = newClientTransport(server);
InOrder inOrder = inOrder(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
client.shutdown();
Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called");
client.shutdown(shutdownReason);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
inOrder.verify(mockClientTransportListener).transportShutdown(statusCaptor.capture());
assertCodeEquals(Status.UNAVAILABLE, statusCaptor.getValue());
inOrder.verify(mockClientTransportListener).transportShutdown(same(shutdownReason));
inOrder.verify(mockClientTransportListener).transportTerminated();
verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
}
@ -319,7 +319,7 @@ public abstract class AbstractTransportTest {
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
client.shutdown();
client.shutdown(Status.UNAVAILABLE);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
inOrder.verify(mockClientTransportListener).transportShutdown(any(Status.class));
inOrder.verify(mockClientTransportListener).transportTerminated();
@ -356,7 +356,7 @@ public abstract class AbstractTransportTest {
ServerStream serverStream = serverStreamCreation.stream;
ServerStreamListener mockServerStreamListener = serverStreamCreation.listener;
client.shutdown();
client.shutdown(Status.UNAVAILABLE);
client = null;
server.shutdown();
serverTransport.shutdown();
@ -494,7 +494,7 @@ public abstract class AbstractTransportTest {
// Stream prevents termination
ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions);
stream.start(mockClientStreamListener);
client.shutdown();
client.shutdown(Status.UNAVAILABLE);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class));
ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class);
try {
@ -513,7 +513,8 @@ public abstract class AbstractTransportTest {
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
client.shutdown();
Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called");
client.shutdown(shutdownReason);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class);
try {
@ -524,7 +525,7 @@ public abstract class AbstractTransportTest {
}
verify(mockPingCallback, timeout(TIMEOUT_MS)).onFailure(throwableCaptor.capture());
Status status = Status.fromThrowable(throwableCaptor.getValue());
assertCodeEquals(Status.UNAVAILABLE, status);
assertSame(shutdownReason, status);
}
@Test
@ -540,7 +541,7 @@ public abstract class AbstractTransportTest {
any(CallOptions.class), any(Metadata.class));
}
stream.start(mockClientStreamListener);
client.shutdown();
client.shutdown(Status.UNAVAILABLE);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class));
ClientStream stream2 = client.newStream(methodDescriptor, new Metadata(), callOptions);
@ -594,19 +595,19 @@ public abstract class AbstractTransportTest {
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
client.shutdown();
Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called");
client.shutdown(shutdownReason);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
Thread.sleep(100);
ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions);
stream.start(mockClientStreamListener);
verify(mockClientStreamListener, timeout(TIMEOUT_MS))
.closed(statusCaptor.capture(), any(Metadata.class));
.closed(same(shutdownReason), any(Metadata.class));
verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
assertCodeEquals(Status.UNAVAILABLE, statusCaptor.getValue());
if (metricsExpected()) {
verify(clientStreamTracerFactory).newClientStreamTracer(
any(CallOptions.class), any(Metadata.class));
assertSame(statusCaptor.getValue(), clientStreamTracer1.getStatus());
assertSame(shutdownReason, clientStreamTracer1.getStatus());
// Assert no interactions
assertNull(serverStreamTracer1.getServerCall());
}
@ -1477,7 +1478,7 @@ public abstract class AbstractTransportTest {
verify(mockClientStreamListener, timeout(TIMEOUT_MS))
.closed(any(Status.class), any(Metadata.class));
verify(mockServerStreamListener, timeout(TIMEOUT_MS)).closed(any(Status.class));
client.shutdown();
client.shutdown(Status.UNAVAILABLE);
}
/**