Split out a ClientStreamListener from StreamListener.

Headers and trailers are only received on client-side, so we need a
client-specific listener. Close() also has slightly different semantics
between server-side and client-side.

Most of the changes are simple name changes, but AbstractServerStream does update to the new close() semantics.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=76094225
This commit is contained in:
ejona 2014-09-22 12:49:20 -07:00 committed by Eric Anderson
parent 9d50299a04
commit 6fc356b13d
20 changed files with 103 additions and 123 deletions

View File

@ -7,9 +7,9 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.ClientStreamListener;
import com.google.net.stubby.newtransport.ClientTransport;
import com.google.net.stubby.newtransport.ClientTransportFactory;
import com.google.net.stubby.newtransport.StreamListener;
import java.io.IOException;
import java.io.InputStream;
@ -139,7 +139,7 @@ public final class ChannelImpl extends AbstractService implements Channel {
public void start(Listener<RespT> observer, Metadata.Headers headers) {
Preconditions.checkState(stream == null, "Already started");
stream = obtainActiveTransport().newStream(method, headers,
new StreamListenerImpl(observer));
new ClientStreamListenerImpl(observer));
}
@Override
@ -191,10 +191,10 @@ public final class ChannelImpl extends AbstractService implements Channel {
}
}
private class StreamListenerImpl implements StreamListener {
private class ClientStreamListenerImpl implements ClientStreamListener {
private final Listener<RespT> observer;
public StreamListenerImpl(Listener<RespT> observer) {
public ClientStreamListenerImpl(Listener<RespT> observer) {
Preconditions.checkNotNull(observer);
this.observer = observer;
}

View File

@ -1,7 +1,7 @@
package com.google.net.stubby;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.StreamListener;
import com.google.net.stubby.newtransport.ClientStreamListener;
import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
@ -14,14 +14,14 @@ import java.io.InputStream;
*/
// TODO(user): Delete this class when new transport interfaces are introduced
public class SessionClientStream implements ClientStream {
private final StreamListener listener;
private final ClientStreamListener listener;
/**
* The {@link Request} used by the stub to dispatch the call
*/
private Request request;
private Response response;
public SessionClientStream(StreamListener listener) {
public SessionClientStream(ClientStreamListener listener) {
this.listener = listener;
}

View File

@ -2,8 +2,8 @@ package com.google.net.stubby;
import com.google.common.util.concurrent.AbstractService;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.ClientStreamListener;
import com.google.net.stubby.newtransport.ClientTransport;
import com.google.net.stubby.newtransport.StreamListener;
/**
* Shim between Session and Channel. Will be removed when Session is removed.
@ -28,7 +28,7 @@ public class SessionClientTransport extends AbstractService implements ClientTra
@Override
public ClientStream newStream(MethodDescriptor<?, ?> method,
Metadata.Headers headers,
StreamListener listener) {
ClientStreamListener listener) {
final SessionClientStream stream = new SessionClientStream(listener);
Request request = session.startRequest(method.getName(), headers,
stream.responseBuilder());

View File

@ -19,7 +19,7 @@ import javax.annotation.concurrent.GuardedBy;
*/
public abstract class AbstractClientStream extends AbstractStream implements ClientStream {
private final StreamListener listener;
private final ClientStreamListener listener;
@GuardedBy("stateLock")
private Status status;
@ -30,7 +30,7 @@ public abstract class AbstractClientStream extends AbstractStream implements Cli
private Status stashedStatus;
private Metadata.Trailers stashedTrailers;
protected AbstractClientStream(StreamListener listener) {
protected AbstractClientStream(ClientStreamListener listener) {
this.listener = Preconditions.checkNotNull(listener);
}

View File

@ -15,7 +15,7 @@ public abstract class AbstractClientTransport extends AbstractService implements
@Override
public final ClientStream newStream(MethodDescriptor<?, ?> method,
Metadata.Headers headers,
StreamListener listener) {
ClientStreamListener listener) {
Preconditions.checkNotNull(method, "method");
Preconditions.checkNotNull(listener, "listener");
if (state() == State.STARTING) {
@ -42,5 +42,5 @@ public abstract class AbstractClientTransport extends AbstractService implements
*/
protected abstract ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
Metadata.Headers headers,
StreamListener listener);
ClientStreamListener listener);
}

View File

@ -27,9 +27,8 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser
/** Whether listener.closed() has been called. */
@GuardedBy("stateLock")
private boolean listenerClosed;
/** Saved application status for notifying when graceful stream termination completes. */
@GuardedBy("stateLock")
private Status gracefulStatus;
/** Whether the stream was closed gracefull by the application (vs. a transport-level failure). */
private boolean gracefulClose;
/** Saved trailers from close() that need to be sent once the framer has sent all messages. */
private Metadata.Trailers stashedTrailers;
@ -58,13 +57,8 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser
Preconditions.checkState(!status.isOk() || state == WRITE_ONLY,
"Cannot close with OK before client half-closes");
state = CLOSED;
if (!listenerClosed) {
// Delay calling listener.closed() until the status has been flushed to the network (which
// is notified via complete()). Since there may be large buffers involved, the actual
// completion of the RPC could be much later than this call.
gracefulStatus = status;
}
}
gracefulClose = true;
trailers.removeAll(Status.CODE_KEY);
trailers.removeAll(Status.MESSAGE_KEY);
trailers.put(Status.CODE_KEY, status.getCode());
@ -109,24 +103,21 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser
* The Stream is considered completely closed and there is no further opportunity for error. It
* calls the listener's {@code closed()} if it was not already done by {@link #abortStream}. Note
* that it is expected that either {@code closed()} or {@code abortStream()} was previously
* called, as otherwise there is no status to provide to the listener.
* called, since {@code closed()} is required for a normal stream closure and {@code
* abortStream()} for abnormal.
*/
public void complete() {
Status status;
synchronized (stateLock) {
if (listenerClosed) {
return;
}
listenerClosed = true;
status = gracefulStatus;
gracefulStatus = null;
}
if (status == null) {
listener.closed(new Status(Transport.Code.INTERNAL, "successful complete() without close()"),
new Metadata.Trailers());
if (!gracefulClose) {
listener.closed(new Status(Transport.Code.INTERNAL, "successful complete() without close()"));
throw new IllegalStateException("successful complete() without close()");
}
listener.closed(status, new Metadata.Trailers());
listener.closed(Status.OK);
}
@Override
@ -179,7 +170,7 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser
dispose();
} finally {
if (closeListener) {
listener.closed(status, new Metadata.Trailers());
listener.closed(status);
}
}
}

View File

@ -0,0 +1,35 @@
package com.google.net.stubby.newtransport;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import javax.annotation.Nullable;
/** An observer of client-side stream events. */
public interface ClientStreamListener extends StreamListener {
/**
* Called upon receiving all header information from the remote end-point.
* <p>This method should return quickly, as the same thread may be used to process other streams.
*
* @param headers the fully buffered received headers.
* @return a processing completion future, or {@code null} to indicate that processing of the
* headers is immediately complete.
*/
@Nullable
ListenableFuture<Void> headersRead(Metadata.Headers headers);
/**
* Called when the stream is fully closed. {@link
* com.google.net.stubby.transport.Transport.Code#OK} is the only status code that is guaranteed
* to have been sent from the remote server. Any other status code may have been caused by
* abnormal stream termination. This is guaranteed to always be the final call on a listener. No
* further callbacks will be issued.
*
* <p>This method should return quickly, as the same thread may be used to process other streams.
*
* @param status details about the remote closure
* @param trailers trailing metadata
*/
void closed(Status status, Metadata.Trailers trailers);
}

View File

@ -19,8 +19,8 @@ public interface ClientTransport extends Service {
* TODO(user): Consider also throwing for stopping.
* <p>
* This method returns immediately and does not wait for any validation of the request. If
* creation fails for any reason, {@link StreamListener#closed} will be called to provide the
* error information. Any sent messages for this stream will be buffered until creation has
* creation fails for any reason, {@link ClientStreamListener#closed} will be called to provide
* the error information. Any sent messages for this stream will be buffered until creation has
* completed (either successfully or unsuccessfully).
*
* @param method the descriptor of the remote method to be called for this stream.
@ -30,5 +30,5 @@ public interface ClientTransport extends Service {
*/
ClientStream newStream(MethodDescriptor<?, ?> method,
Metadata.Headers headers,
StreamListener listener);
ClientStreamListener listener);
}

View File

@ -1,34 +0,0 @@
package com.google.net.stubby.newtransport;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import java.io.InputStream;
/**
* A decorator around another {@link StreamListener}.
*/
public class ForwardingStreamListener implements StreamListener {
private final StreamListener delegate;
public ForwardingStreamListener(StreamListener delegate) {
this.delegate = delegate;
}
@Override
public ListenableFuture<Void> headersRead(Metadata.Headers headers) {
return delegate.headersRead(headers);
}
@Override
public ListenableFuture<Void> messageRead(InputStream message, int length) {
return delegate.messageRead(message, length);
}
@Override
public void closed(Status status, Metadata.Trailers trailers) {
delegate.closed(status, trailers);
}
}

View File

@ -11,4 +11,19 @@ public interface ServerStreamListener extends StreamListener {
* <p>This method should return quickly, as the same thread may be used to process other streams.
*/
void halfClosed();
/**
* Called when the stream is fully closed. A status code of {@link
* com.google.net.stubby.transport.Transport.Code#OK} implies normal termination of the stream.
* Any other value implies abnormal termination. Since clients cannot send status, the passed
* status is always library-generated and only is concerned with transport-level stream shutdown
* (the call itself may have had a failing status, but if the stream terminated cleanly with the
* status appearing to have been sent, then the passed status here would be OK). This is
* guaranteed to always be the final call on a listener. No further callbacks will be issued.
*
* <p>This method should return quickly, as the same thread may be used to process other streams.
*
* @param status details about the remote closure
*/
void closed(Status status);
}

View File

@ -1,8 +1,6 @@
package com.google.net.stubby.newtransport;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import java.io.InputStream;
@ -13,18 +11,6 @@ import javax.annotation.Nullable;
* time.
*/
public interface StreamListener {
/**
* Called upon receiving all header information from the remote end-point.
* <p>This method should return quickly, as the same thread may be used to process other streams.
*
* @param headers the fully buffered received headers.
* @return a processing completion future, or {@code null} to indicate that processing of the
* headers is immediately complete.
*/
@Nullable
ListenableFuture<Void> headersRead(Metadata.Headers headers);
/**
* Called upon receiving a message from the remote end-point. The {@link InputStream} is
* non-blocking and contains the entire message.
@ -46,17 +32,4 @@ public interface StreamListener {
*/
@Nullable
ListenableFuture<Void> messageRead(InputStream message, int length);
/**
* Called when the stream is fully closed. A status code of {@link
* com.google.net.stubby.transport.Transport.Code#OK} implies normal termination of the stream.
* Any other value implies abnormal termination. This is guaranteed to always be the final call on
* a listener. No further callbacks will be issued.
*
* <p>This method should return quickly, as the same thread may be used to process other streams.
*
* @param status details about the remote closure
* @param trailers trailing metadata
*/
void closed(Status status, Metadata.Trailers trailers);
}

View File

@ -13,8 +13,8 @@ import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.AbstractClientStream;
import com.google.net.stubby.newtransport.AbstractClientTransport;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.ClientStreamListener;
import com.google.net.stubby.newtransport.InputStreamDeframer;
import com.google.net.stubby.newtransport.StreamListener;
import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
@ -44,7 +44,7 @@ public class HttpClientTransport extends AbstractClientTransport {
@Override
protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
Metadata.Headers headers,
StreamListener listener) {
ClientStreamListener listener) {
URI uri = baseUri.resolve(method.getName());
HttpClientStream stream = new HttpClientStream(uri, headers.serializeAscii(), listener);
synchronized (streams) {
@ -83,7 +83,7 @@ public class HttpClientTransport extends AbstractClientTransport {
final DataOutputStream outputStream;
boolean connected;
HttpClientStream(URI uri, String[] headers, StreamListener listener) {
HttpClientStream(URI uri, String[] headers, ClientStreamListener listener) {
super(listener);
try {

View File

@ -8,10 +8,10 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.AbstractClientStream;
import com.google.net.stubby.newtransport.ClientStreamListener;
import com.google.net.stubby.newtransport.GrpcDeframer;
import com.google.net.stubby.newtransport.HttpUtil;
import com.google.net.stubby.newtransport.MessageDeframer2;
import com.google.net.stubby.newtransport.StreamListener;
import com.google.net.stubby.transport.Transport;
import io.netty.buffer.ByteBuf;
@ -40,7 +40,7 @@ class NettyClientStream extends AbstractClientStream implements NettyStream {
private boolean isGrpcResponse;
private StringBuilder nonGrpcErrorMessage = new StringBuilder();
NettyClientStream(StreamListener listener, Channel channel,
NettyClientStream(ClientStreamListener listener, Channel channel,
DefaultHttp2InboundFlowController inboundFlow) {
super(listener);
this.channel = Preconditions.checkNotNull(channel, "channel");

View File

@ -7,8 +7,8 @@ import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.newtransport.AbstractClientTransport;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.ClientStreamListener;
import com.google.net.stubby.newtransport.ClientTransport;
import com.google.net.stubby.newtransport.StreamListener;
import com.google.net.stubby.newtransport.netty.NettyClientTransportFactory.NegotiationType;
import com.google.net.stubby.testing.utils.ssl.SslContextFactory;
@ -89,7 +89,7 @@ class NettyClientTransport extends AbstractClientTransport {
@Override
protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, Metadata.Headers headers,
StreamListener listener) {
ClientStreamListener listener) {
// Create the stream.
NettyClientStream stream = new NettyClientStream(listener, channel, handler.inboundFlow());

View File

@ -10,9 +10,9 @@ import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.AbstractClientStream;
import com.google.net.stubby.newtransport.AbstractClientTransport;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.ClientStreamListener;
import com.google.net.stubby.newtransport.ClientTransport;
import com.google.net.stubby.newtransport.InputStreamDeframer;
import com.google.net.stubby.newtransport.StreamListener;
import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
import com.google.net.stubby.transport.Transport.Code;
@ -129,7 +129,7 @@ public class OkHttpClientTransport extends AbstractClientTransport {
@Override
protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
Metadata.Headers headers,
StreamListener listener) {
ClientStreamListener listener) {
return new OkHttpClientStream(method, headers, listener);
}
@ -403,7 +403,7 @@ public class OkHttpClientTransport extends AbstractClientTransport {
int unacknowledgedBytesRead;
OkHttpClientStream(MethodDescriptor<?, ?> method, Metadata.Headers headers,
StreamListener listener) {
ClientStreamListener listener) {
super(listener);
deframer = new InputStreamDeframer(inboundMessageHandler());
synchronized (lock) {

View File

@ -12,6 +12,7 @@ import static org.mockito.Mockito.verify;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.ClientStreamListener;
import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
@ -23,12 +24,20 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
/**
* Tests for {@link NettyClientStream}.
*/
@RunWith(JUnit4.class)
public class NettyClientStreamTest extends NettyStreamTestBase {
@Mock
protected ClientStreamListener listener;
@Override
protected ClientStreamListener listener() {
return listener;
}
@Test
public void closeShouldSucceed() {

View File

@ -9,7 +9,6 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.notNull;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@ -159,7 +158,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase {
handler.channelRead(ctx, rstStreamFrame(STREAM_ID, Http2Error.CANCEL.code()));
verify(streamListener, never()).messageRead(any(InputStream.class), anyInt());
verify(streamListener).closed(eq(Status.CANCELLED), notNull(Metadata.Trailers.class));
verify(streamListener).closed(Status.CANCELLED);
verifyNoMoreInteractions(streamListener);
}

View File

@ -2,8 +2,6 @@ package com.google.net.stubby.newtransport.netty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.notNull;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@ -27,7 +25,6 @@ import org.mockito.Mock;
/** Unit tests for {@link NettyServerStream}. */
@RunWith(JUnit4.class)
public class NettyServerStreamTest extends NettyStreamTestBase {
@Mock
protected ServerStreamListener serverListener;
private Metadata.Trailers trailers = new Metadata.Trailers();
@ -61,7 +58,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
verifyZeroInteractions(serverListener);
// Sending complete. Listener gets closed()
stream().complete();
verify(serverListener).closed(eq(Status.CANCELLED), notNull(Metadata.Trailers.class));
verify(serverListener).closed(Status.OK);
assertEquals(StreamState.CLOSED, stream.state());
verifyZeroInteractions(serverListener);
}
@ -80,7 +77,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
new SendGrpcFrameCommand(STREAM_ID, statusFrame(Status.OK), true));
// Sending and receiving complete. Listener gets closed()
stream().complete();
verify(serverListener).closed(eq(Status.OK), notNull(Metadata.Trailers.class));
verify(serverListener).closed(Status.OK);
verifyNoMoreInteractions(serverListener);
}
@ -89,7 +86,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
Status status = new Status(Transport.Code.INTERNAL, new Throwable());
stream().abortStream(status, true);
assertEquals(StreamState.CLOSED, stream.state());
verify(serverListener).closed(same(status), notNull(Metadata.Trailers.class));
verify(serverListener).closed(same(status));
verify(channel).writeAndFlush(new SendGrpcFrameCommand(STREAM_ID, statusFrame(status), true));
verifyNoMoreInteractions(serverListener);
}
@ -99,7 +96,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
Status status = new Status(Transport.Code.INTERNAL, new Throwable());
stream().abortStream(status, false);
assertEquals(StreamState.CLOSED, stream.state());
verify(serverListener).closed(same(status), notNull(Metadata.Trailers.class));
verify(serverListener).closed(same(status));
verify(channel, never()).writeAndFlush(
new SendGrpcFrameCommand(STREAM_ID, statusFrame(status), true));
verifyNoMoreInteractions(serverListener);
@ -114,7 +111,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
verify(serverListener).halfClosed();
// Abort
stream().abortStream(status, true);
verify(serverListener).closed(same(status), notNull(Metadata.Trailers.class));
verify(serverListener).closed(same(status));
assertEquals(StreamState.CLOSED, stream.state());
verifyNoMoreInteractions(serverListener);
}

View File

@ -66,9 +66,6 @@ public abstract class NettyStreamTestBase {
@Mock
protected ChannelFuture future;
@Mock
protected StreamListener listener;
@Mock
protected Runnable accepted;
@ -137,9 +134,7 @@ public abstract class NettyStreamTestBase {
protected abstract NettyStream createStream();
protected StreamListener listener() {
return listener;
}
protected abstract StreamListener listener();
private String toString(InputStream in) throws Exception {
byte[] bytes = new byte[in.available()];

View File

@ -17,7 +17,7 @@ import com.google.common.util.concurrent.Service;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.StreamListener;
import com.google.net.stubby.newtransport.ClientStreamListener;
import com.google.net.stubby.newtransport.okhttp.OkHttpClientTransport.ClientFrameHandler;
import com.google.net.stubby.newtransport.okhttp.OkHttpClientTransport.OkHttpClientStream;
import com.google.net.stubby.transport.Transport;
@ -437,7 +437,7 @@ public class OkHttpClientTransportTest {
}
}
private static class MockStreamListener implements StreamListener {
private static class MockStreamListener implements ClientStreamListener {
Status status;
CountDownLatch closed = new CountDownLatch(1);
ArrayList<String> messages = new ArrayList<String>();