netty: Cancel stream if interrupted during create

Previously streams were being partially orphaned if there was an
interruption during stream creation. To handle cancellation,
AbstractClientStream's cancel() had to be changed remove the
"optimization" otherwise, again, the stream would be orphaned.
This commit is contained in:
Eric Anderson 2015-01-29 17:30:44 -08:00
parent f12996086b
commit f3a90cd42b
5 changed files with 25 additions and 13 deletions

View File

@ -260,17 +260,14 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
@Override
public void cancel() {
outboundPhase(Phase.STATUS);
if (id() != null) {
// Only send a cancellation to remote side if we have actually been allocated
// a stream id and we are not already closed. i.e. the server side is aware of the stream.
sendCancel();
}
sendCancel();
dispose();
}
/**
* Send a stream cancellation message to the remote server. Can be called by either the
* application or transport layers.
* Cancel the stream and send a stream cancellation message to the remote server, if necessary.
* Can be called by either the application or transport layers. This method is safe to be called
* at any time and multiple times.
*/
protected abstract void sendCancel();

View File

@ -39,7 +39,8 @@ public interface ClientStream extends Stream {
/**
* Used to abnormally terminate the stream. After calling this method, no further messages will be
* sent or received, however it may still be possible to receive buffered messages for a brief
* period until {@link ClientStreamListener#closed} is called.
* period until {@link ClientStreamListener#closed} is called. This method is safe to be called
* at any time and multiple times.
*/
void cancel();

View File

@ -155,11 +155,11 @@ class NettyClientTransport extends AbstractClientTransport {
} catch (InterruptedException e) {
// Restore the interrupt.
Thread.currentThread().interrupt();
stream.dispose();
stream.cancel();
throw new RuntimeException(e);
} catch (ExecutionException e) {
stream.dispose();
throw new RuntimeException(e);
stream.cancel();
throw new RuntimeException(e.getCause() != null ? e.getCause() : e);
}
return stream;

View File

@ -47,6 +47,7 @@ import static org.mockito.Mockito.calls;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import io.grpc.Metadata;
@ -175,6 +176,19 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
verify(promise).setFailure(any(Throwable.class));
}
@Test
public void cancelBeforeStreamAssignedShouldSucceed() throws Exception {
handler.connection().local().maxStreams(0);
handler.write(ctx, new CreateStreamCommand(grpcHeaders, stream), promise);
mockContext();
verify(stream, never()).id(any(Integer.class));
when(stream.id()).thenReturn(null);
handler.write(ctx, new CancelStreamCommand(stream), promise);
verify(promise).setSuccess();
verifyNoMoreInteractions(ctx);
}
@Test
public void sendFrameShouldSucceed() throws Exception {
createStream();

View File

@ -97,9 +97,9 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
}
@Test
public void cancelShouldNotSendCommandIfStreamNotCreated() {
public void cancelShouldStillSendCommandIfStreamNotCreatedToCancelCreation() {
stream().cancel();
verify(channel, never()).writeAndFlush(any(CancelStreamCommand.class));
verify(channel).writeAndFlush(any(CancelStreamCommand.class));
}
@Test