Implement shutdownNow

Fixes #448
This commit is contained in:
Eric Anderson 2016-03-07 09:04:49 -08:00
parent 631a9d5fac
commit c6fd94ca85
18 changed files with 529 additions and 19 deletions

View File

@ -64,8 +64,6 @@ public abstract class ManagedChannel extends Channel {
* Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
* forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
* return {@code false} immediately after this method returns.
*
* <p>NOT YET IMPLEMENTED. This method currently behaves identically to shutdown().
*/
public abstract ManagedChannel shutdownNow();

View File

@ -52,7 +52,9 @@ import io.grpc.internal.ServerTransportListener;
import java.io.InputStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.logging.Level;
@ -171,6 +173,22 @@ class InProcessTransport implements ServerTransport, ManagedClientTransport {
}
}
@Override
public void shutdownNow(Status reason) {
checkNotNull(reason, "reason");
List<InProcessStream> streamsCopy;
synchronized (this) {
shutdown();
if (terminated) {
return;
}
streamsCopy = new ArrayList<InProcessStream>(streams);
}
for (InProcessStream stream : streamsCopy) {
stream.clientStream.cancel(reason);
}
}
@Override
public String toString() {
return getLogId() + "(" + name + ")";
@ -460,6 +478,7 @@ class InProcessTransport implements ServerTransport, ManagedClientTransport {
return serverRequested > 0;
}
// Must be thread-safe for shutdownNow()
@Override
public void cancel(Status reason) {
if (!internalCancel(stripCause(reason))) {

View File

@ -155,6 +155,7 @@ class DelayedClientTransport implements ManagedClientTransport {
* Shuts down this transport and cancels all streams that it owns, hence immediately terminates
* this transport.
*/
@Override
public void shutdownNow(Status status) {
shutdown();
Collection<PendingStream> savedPendingStreams = null;

View File

@ -128,12 +128,14 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
new HashSet<DelayedClientTransport>();
@GuardedBy("lock")
private final HashSet<OobTransportProvider<ClientTransport>> oobTransports =
new HashSet<OobTransportProvider<ClientTransport>>();
private final HashSet<OobTransportProviderImpl> oobTransports =
new HashSet<OobTransportProviderImpl>();
@GuardedBy("lock")
private boolean shutdown;
@GuardedBy("lock")
private boolean shutdownNowed;
@GuardedBy("lock")
private boolean terminated;
private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
@ -255,6 +257,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
ArrayList<TransportSet> transportsCopy = new ArrayList<TransportSet>();
ArrayList<DelayedClientTransport> delayedTransportsCopy =
new ArrayList<DelayedClientTransport>();
ArrayList<OobTransportProviderImpl> oobTransportsCopy =
new ArrayList<OobTransportProviderImpl>();
synchronized (lock) {
if (shutdown) {
return this;
@ -266,6 +270,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
if (!terminated) {
transportsCopy.addAll(transports.values());
delayedTransportsCopy.addAll(delayedTransports);
oobTransportsCopy.addAll(oobTransports);
}
}
loadBalancer.shutdown();
@ -276,7 +281,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
for (DelayedClientTransport transport : delayedTransportsCopy) {
transport.shutdown();
}
for (OobTransportProvider<ClientTransport> provider : oobTransports) {
for (OobTransportProviderImpl provider : oobTransportsCopy) {
provider.close();
}
if (log.isLoggable(Level.FINE)) {
@ -289,14 +294,39 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
* Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
* forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
* return {@code false} immediately after this method returns.
*
* <p>NOT YET IMPLEMENTED. This method currently behaves identically to shutdown().
*/
// TODO(ejona86): cancel preexisting calls.
@Override
public ManagedChannelImpl shutdownNow() {
synchronized (lock) {
// Short-circuiting not strictly necessary, but prevents transports from needing to handle
// multiple shutdownNow invocations.
if (shutdownNowed) {
return this;
}
shutdownNowed = true;
}
shutdown();
// TODO(zhangkun): also call shutdownNow() on oobTransports.
List<TransportSet> transportsCopy;
List<DelayedClientTransport> delayedTransportsCopy;
List<OobTransportProviderImpl> oobTransportsCopy;
synchronized (lock) {
transportsCopy = new ArrayList<TransportSet>(transports.values());
delayedTransportsCopy = new ArrayList<DelayedClientTransport>(delayedTransports);
oobTransportsCopy = new ArrayList<OobTransportProviderImpl>(oobTransports);
}
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Shutting down now", getLogId());
}
Status nowStatus = Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
for (TransportSet ts : transportsCopy) {
ts.shutdownNow(nowStatus);
}
for (DelayedClientTransport transport : delayedTransportsCopy) {
transport.shutdownNow(nowStatus);
}
for (OobTransportProviderImpl provider : oobTransportsCopy) {
provider.shutdownNow(nowStatus);
}
return this;
}
@ -549,5 +579,11 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
transportSet.shutdown();
}
}
void shutdownNow(Status reason) {
if (transportSet != null) {
transportSet.shutdownNow(reason);
}
}
}
}

View File

@ -68,6 +68,12 @@ public interface ManagedClientTransport extends ClientTransport, WithLogId {
*/
void shutdown();
/**
* Initiates a forceful shutdown in which preexisting and new calls are closed. Existing calls
* should be closed with the provided {@code reason}.
*/
void shutdownNow(Status reason);
/**
* Receives notifications for the transport life-cycle events. Implementation does not need to be
* thread-safe, so notifications must be properly synchronized externally.

View File

@ -84,6 +84,10 @@ public final class ServerImpl extends io.grpc.Server {
private final HandlerRegistry fallbackRegistry;
private boolean started;
private boolean shutdown;
/** non-{@code null} if immediate shutdown has been requested. */
private Status shutdownNowStatus;
/** {@code true} if ServerListenerImpl.serverShutdown() was called. */
private boolean serverShutdownCallbackInvoked;
private boolean terminated;
/** Service encapsulating something similar to an accept() socket. */
private final InternalServer transportServer;
@ -179,13 +183,31 @@ public final class ServerImpl extends io.grpc.Server {
return this;
}
/**
* NOT YET IMPLEMENTED. This method currently behaves identically to shutdown().
*/
// TODO(ejona86): cancel preexisting calls.
@Override
public ServerImpl shutdownNow() {
shutdown();
Collection<ServerTransport> transportsCopy;
Status nowStatus = Status.UNAVAILABLE.withDescription("Server shutdownNow invoked");
boolean savedServerShutdownCallbackInvoked;
synchronized (lock) {
// Short-circuiting not strictly necessary, but prevents transports from needing to handle
// multiple shutdownNow invocations if shutdownNow is called multiple times.
if (shutdownNowStatus != null) {
return this;
}
shutdownNowStatus = nowStatus;
transportsCopy = new ArrayList<ServerTransport>(transports);
savedServerShutdownCallbackInvoked = serverShutdownCallbackInvoked;
}
// Short-circuiting not strictly necessary, but prevents transports from needing to handle
// multiple shutdownNow invocations, between here and the serverShutdown callback.
if (serverShutdownCallbackInvoked) {
// Have to call shutdownNow, because serverShutdown callback only called shutdown, not
// shutdownNow
for (ServerTransport transport : transportsCopy) {
transport.shutdownNow(nowStatus);
}
}
return this;
}
@ -265,13 +287,20 @@ public final class ServerImpl extends io.grpc.Server {
@Override
public void serverShutdown() {
ArrayList<ServerTransport> copiedTransports;
Status shutdownNowStatusCopy;
synchronized (lock) {
// transports collection can be modified during shutdown(), even if we hold the lock, due
// to reentrancy.
copiedTransports = new ArrayList<ServerTransport>(transports);
shutdownNowStatusCopy = shutdownNowStatus;
serverShutdownCallbackInvoked = true;
}
for (ServerTransport transport : copiedTransports) {
transport.shutdown();
if (shutdownNowStatusCopy == null) {
transport.shutdown();
} else {
transport.shutdownNow(shutdownNowStatusCopy);
}
}
synchronized (lock) {
transportServerTerminated = true;

View File

@ -31,6 +31,8 @@
package io.grpc.internal;
import io.grpc.Status;
/** An inbound connection. */
public interface ServerTransport {
/**
@ -39,4 +41,10 @@ public interface ServerTransport {
* be processed on a separate thread. May only be called once.
*/
void shutdown();
/**
* Initiates a forceful shutdown in which preexisting and new calls are closed. Existing calls
* should be closed with the provided {@code reason}.
*/
void shutdownNow(Status reason);
}

View File

@ -292,6 +292,17 @@ final class TransportSet implements WithLogId {
}
}
void shutdownNow(Status reason) {
shutdown();
Collection<ManagedClientTransport> transportsCopy;
synchronized (lock) {
transportsCopy = new ArrayList<ManagedClientTransport>(transports);
}
for (ManagedClientTransport transport : transportsCopy) {
transport.shutdownNow(reason);
}
}
@GuardedBy("lock")
private void cancelReconnectTask() {
if (reconnectTask != null) {

View File

@ -264,6 +264,113 @@ public class ManagedChannelImplTest {
verifyNoMoreInteractions(mockStream);
}
@Test
public void callAndShutdownNow() {
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
ManagedChannel channel = createChannel(nameResolverFactory, NO_INTERCEPTOR);
verifyNoMoreInteractions(mockTransportFactory);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
verifyNoMoreInteractions(mockTransportFactory);
// Create transport and call
ClientStream mockStream = mock(ClientStream.class);
Metadata headers = new Metadata();
when(mockTransportFactory.newClientTransport(
any(SocketAddress.class), any(String.class), any(String.class)))
.thenReturn(mockTransport);
when(mockTransport.newStream(same(method), same(headers))).thenReturn(mockStream);
call.start(mockCallListener, headers);
verify(mockTransportFactory, timeout(1000))
.newClientTransport(same(socketAddress), eq(authority), any(String.class));
verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture());
ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue();
transportListener.transportReady();
verify(mockTransport, timeout(1000)).newStream(same(method), same(headers));
verify(mockStream, timeout(1000)).start(streamListenerCaptor.capture());
verify(mockStream).setCompressor(isA(Compressor.class));
// Depends on how quick the real transport is created, ClientCallImpl may start on mockStream
// directly, or on a DelayedStream which later starts mockStream. In the former case,
// setMessageCompression() is not called. In the latter case, it is (in
// DelayedStream.startStream()).
verify(mockStream, atMost(1)).setMessageCompression(anyBoolean());
ClientStreamListener streamListener = streamListenerCaptor.getValue();
// ShutdownNow
channel.shutdownNow();
assertTrue(channel.isShutdown());
assertFalse(channel.isTerminated());
// ShutdownNow may or may not invoke shutdown. Ideally it wouldn't, but it doesn't matter much
// either way.
verify(mockTransport, atMost(1)).shutdown();
verify(mockTransport).shutdownNow(any(Status.class));
assertEquals(1, nameResolverFactory.resolvers.size());
assertTrue(nameResolverFactory.resolvers.get(0).shutdown);
assertEquals(1, loadBalancerFactory.balancers.size());
verify(loadBalancerFactory.balancers.get(0)).shutdown();
// Further calls should fail without going to the transport
ClientCall<String, Integer> call3 = channel.newCall(method, CallOptions.DEFAULT);
call3.start(mockCallListener3, new Metadata());
verify(mockCallListener3, timeout(1000))
.onClose(statusCaptor.capture(), any(Metadata.class));
assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
// Finish shutdown
transportListener.transportShutdown(Status.CANCELLED);
assertFalse(channel.isTerminated());
Metadata trailers = new Metadata();
streamListener.closed(Status.CANCELLED, trailers);
verify(mockCallListener, timeout(1000)).onClose(Status.CANCELLED, trailers);
assertFalse(channel.isTerminated());
transportListener.transportTerminated();
assertTrue(channel.isTerminated());
verify(mockTransportFactory).close();
verifyNoMoreInteractions(mockTransportFactory);
verify(mockTransport, atLeast(0)).getLogId();
verifyNoMoreInteractions(mockTransport);
verifyNoMoreInteractions(mockStream);
}
/** Make sure shutdownNow() after shutdown() has an effect. */
@Test
public void callAndShutdownAndShutdownNow() {
ManagedChannel channel = createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
// Create transport and call
ClientStream mockStream = mock(ClientStream.class);
Metadata headers = new Metadata();
when(mockTransport.newStream(same(method), same(headers))).thenReturn(mockStream);
call.start(mockCallListener, headers);
verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture());
ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue();
transportListener.transportReady();
verify(mockStream, timeout(1000)).start(streamListenerCaptor.capture());
ClientStreamListener streamListener = streamListenerCaptor.getValue();
// ShutdownNow
channel.shutdown();
channel.shutdownNow();
// ShutdownNow may or may not invoke shutdown. Ideally it wouldn't, but it doesn't matter much
// either way.
verify(mockTransport, atMost(2)).shutdown();
verify(mockTransport).shutdownNow(any(Status.class));
// Finish shutdown
transportListener.transportShutdown(Status.CANCELLED);
assertFalse(channel.isTerminated());
Metadata trailers = new Metadata();
streamListener.closed(Status.CANCELLED, trailers);
verify(mockCallListener, timeout(1000)).onClose(Status.CANCELLED, trailers);
assertFalse(channel.isTerminated());
transportListener.transportTerminated();
assertTrue(channel.isTerminated());
}
@Test
public void interceptor() throws Exception {
final AtomicLong atomic = new AtomicLong();

View File

@ -370,4 +370,20 @@ public class ManagedChannelImplTransportManagerTest {
transportInfo2.listener.transportTerminated();
assertTrue(channel.isTerminated());
}
@Test
public void interimTransportShutdownNow() {
InterimTransport<ClientTransport> interimTransport = tm.createInterimTransport();
ClientTransport transport = interimTransport.transport();
assertTrue(transport instanceof DelayedClientTransport);
ClientStream s1 = transport.newStream(method, new Metadata());
ClientStreamListener sl1 = mock(ClientStreamListener.class);
s1.start(sl1);
// Shutting down the channel will shutdownNow the interim transport, thus kill existing streams.
channel.shutdownNow();
verify(sl1).closed(any(Status.class), any(Metadata.class));
assertTrue(channel.isShutdown());
assertTrue(channel.isTerminated());
}
}

View File

@ -198,6 +198,95 @@ public class ServerImplTest {
assertTrue(server.isTerminated());
}
@Test
public void startShutdownNowImmediateWithChildTransport() throws IOException {
ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
SERVER_CONTEXT, decompressorRegistry, compressorRegistry);
server.start();
class DelayedShutdownServerTransport extends SimpleServerTransport {
boolean shutdown;
@Override
public void shutdown() {}
@Override
public void shutdownNow(Status reason) {
shutdown = true;
}
}
DelayedShutdownServerTransport serverTransport = new DelayedShutdownServerTransport();
transportServer.registerNewServerTransport(serverTransport);
server.shutdownNow();
assertTrue(server.isShutdown());
assertFalse(server.isTerminated());
assertTrue(serverTransport.shutdown);
serverTransport.listener.transportTerminated();
assertTrue(server.isTerminated());
}
@Test
public void shutdownNowAfterShutdown() throws IOException {
ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
SERVER_CONTEXT, decompressorRegistry, compressorRegistry);
server.start();
class DelayedShutdownServerTransport extends SimpleServerTransport {
boolean shutdown;
@Override
public void shutdown() {}
@Override
public void shutdownNow(Status reason) {
shutdown = true;
}
}
DelayedShutdownServerTransport serverTransport = new DelayedShutdownServerTransport();
transportServer.registerNewServerTransport(serverTransport);
server.shutdown();
assertTrue(server.isShutdown());
server.shutdownNow();
assertFalse(server.isTerminated());
assertTrue(serverTransport.shutdown);
serverTransport.listener.transportTerminated();
assertTrue(server.isTerminated());
}
@Test
public void shutdownNowAfterSlowShutdown() throws IOException {
SimpleServer transportServer = new SimpleServer() {
@Override
public void shutdown() {
// Don't call super which calls listener.serverShutdown(). We'll call it manually.
}
};
ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
SERVER_CONTEXT, decompressorRegistry, compressorRegistry);
server.start();
class DelayedShutdownServerTransport extends SimpleServerTransport {
boolean shutdown;
@Override
public void shutdown() {}
@Override
public void shutdownNow(Status reason) {
shutdown = true;
}
}
DelayedShutdownServerTransport serverTransport = new DelayedShutdownServerTransport();
transportServer.registerNewServerTransport(serverTransport);
server.shutdown();
server.shutdownNow();
transportServer.listener.serverShutdown();
assertTrue(server.isShutdown());
assertFalse(server.isTerminated());
serverTransport.listener.transportTerminated();
assertTrue(server.isTerminated());
}
@Test
public void transportServerFailsStartup() {
final IOException ex = new IOException();
@ -641,5 +730,10 @@ public class ServerImplTest {
public void shutdown() {
listener.transportTerminated();
}
@Override
public void shutdownNow(Status status) {
listener.transportTerminated();
}
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.netty;
import io.grpc.Status;
/**
* A command to trigger close and close all streams. It is buffered differently than normal close
* and also includes reason for closure.
*/
class ForcefulCloseCommand {
private final Status status;
public ForcefulCloseCommand(Status status) {
this.status = status;
}
public Status getStatus() {
return status;
}
}

View File

@ -198,6 +198,8 @@ class NettyClientHandler extends AbstractNettyHandler {
sendPingFrame(ctx, (SendPingCommand) msg, promise);
} else if (msg instanceof GracefulCloseCommand) {
gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
} else if (msg instanceof ForcefulCloseCommand) {
forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
} else if (msg == NOOP_MESSAGE) {
ctx.write(Unpooled.EMPTY_BUFFER, promise);
} else {
@ -467,6 +469,25 @@ class NettyClientHandler extends AbstractNettyHandler {
close(ctx, promise);
}
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
ChannelPromise promise) throws Exception {
lifecycleManager.notifyShutdown(
Status.UNAVAILABLE.withDescription("Channel requested transport to shut down"));
close(ctx, promise);
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyClientStream clientStream = clientStream(stream);
if (clientStream != null) {
clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
}
stream.close();
return true;
}
});
}
/**
* Handler for a GOAWAY being either sent or received. Fails any streams created after the
* last known stream.

View File

@ -195,6 +195,14 @@ class NettyClientTransport implements ManagedClientTransport {
}
}
@Override
public void shutdownNow(Status reason) {
// Notifying of termination is automatically done when the channel closes.
if (channel != null && channel.isOpen()) {
handler.getWriteQueue().enqueue(new ForcefulCloseCommand(reason), true);
}
}
@Override
public String toString() {
return getLogId() + "(" + address + ")";

View File

@ -290,7 +290,7 @@ class NettyServerHandler extends AbstractNettyHandler {
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Http2Exception {
throws Exception {
if (msg instanceof SendGrpcFrameCommand) {
sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
} else if (msg instanceof SendResponseHeadersCommand) {
@ -299,6 +299,8 @@ class NettyServerHandler extends AbstractNettyHandler {
((RequestMessagesCommand) msg).requestMessages();
} else if (msg instanceof CancelServerStreamCommand) {
cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
} else if (msg instanceof ForcefulCloseCommand) {
forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
} else {
AssertionError e =
new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
@ -360,6 +362,23 @@ class NettyServerHandler extends AbstractNettyHandler {
encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
}
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
ChannelPromise promise) throws Exception {
close(ctx, promise);
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyServerStream.TransportState serverStream = serverStream(stream);
if (serverStream != null) {
serverStream.transportReportStatus(msg.getStatus());
resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
}
stream.close();
return true;
}
});
}
private void verifyContentType(int streamId, Http2Headers headers) throws Http2Exception {
CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
if (contentType == null) {

View File

@ -33,6 +33,7 @@ package io.grpc.netty;
import com.google.common.base.Preconditions;
import io.grpc.Status;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.netty.channel.Channel;
@ -94,6 +95,13 @@ class NettyServerTransport implements ServerTransport {
}
}
@Override
public void shutdownNow(Status reason) {
if (channel.isOpen()) {
channel.writeAndFlush(new ForcefulCloseCommand(reason));
}
}
/**
* For testing purposes only.
*/

View File

@ -452,6 +452,26 @@ class OkHttpClientTransport implements ManagedClientTransport {
}
}
@Override
public void shutdownNow(Status reason) {
shutdown();
synchronized (lock) {
Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Integer, OkHttpClientStream> entry = it.next();
it.remove();
entry.getValue().transportReportStatus(reason, false, new Metadata());
}
for (OkHttpClientStream stream : pendingStreams) {
stream.transportReportStatus(reason, true, new Metadata());
}
pendingStreams.clear();
stopIfNecessary();
}
}
/**
* Gets all active streams as an array.
*/

View File

@ -41,6 +41,7 @@ import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@ -145,12 +146,10 @@ public abstract class AbstractTransportTest {
@After
public void tearDown() throws InterruptedException {
if (client != null) {
// TODO(ejona): swap to shutdownNow
client.shutdown();
client.shutdownNow(Status.UNKNOWN.withDescription("teardown"));
}
if (serverTransport != null) {
// TODO(ejona): swap to shutdownNow
serverTransport.shutdown();
serverTransport.shutdownNow(Status.UNKNOWN.withDescription("teardown"));
}
if (server != null) {
server.shutdown();
@ -316,6 +315,66 @@ public abstract class AbstractTransportTest {
assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
@Test
public void shutdownNowKillsClientStream() throws Exception {
server.start(serverListener);
client.start(mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata());
clientStream.start(mockClientStreamListener);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
ServerStreamListener mockServerStreamListener = serverStreamCreation.listener;
Status status = Status.UNKNOWN.withDescription("test shutdownNow");
client.shutdownNow(status);
client = null;
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class));
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertTrue(serverTransportListener.isTerminated());
verify(mockClientStreamListener, timeout(TIMEOUT_MS))
.closed(same(status), any(Metadata.class));
verify(mockServerStreamListener, timeout(TIMEOUT_MS)).closed(any(Status.class));
}
@Test
public void shutdownNowKillsServerStream() throws Exception {
server.start(serverListener);
client.start(mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata());
clientStream.start(mockClientStreamListener);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
ServerStreamListener mockServerStreamListener = serverStreamCreation.listener;
serverTransport.shutdownNow(Status.UNKNOWN.withDescription("test shutdownNow"));
serverTransport = null;
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class));
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertTrue(serverTransportListener.isTerminated());
verify(mockClientStreamListener, timeout(TIMEOUT_MS))
.closed(any(Status.class), any(Metadata.class));
// Generally will be same status provided to shutdownNow, but InProcessTransport can't
// differentiate between client and server shutdownNow. The status is not really used on
// server-side, so we don't care much.
verify(mockServerStreamListener, timeout(TIMEOUT_MS)).closed(any(Status.class));
}
@Test
public void ping() throws Exception {
server.start(serverListener);