Replace AUTHORITY_KEY with ClientStream.setAuthority

This commit is contained in:
Eric Anderson 2016-01-23 17:36:23 -08:00
parent 64bc830f65
commit 964963ab1a
18 changed files with 180 additions and 154 deletions

View File

@ -40,8 +40,6 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import io.grpc.internal.GrpcUtil;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
@ -251,11 +249,6 @@ public final class Metadata {
// One *2 for keys+values, one *2 to prevent resizing if a single key has multiple values
List<byte[]> serialized = new ArrayList<byte[]>(store.size() * 2 * 2);
for (Map.Entry<String, List<MetadataEntry>> keyEntry : store.entrySet()) {
// Intentionally skip this field on serialization. It must be handled special by the
// transport.
if (keyEntry.getKey().equals(GrpcUtil.AUTHORITY_KEY.name())) {
continue;
}
for (int i = 0; i < keyEntry.getValue().size(); i++) {
MetadataEntry entry = keyEntry.getValue().get(i);
byte[] asciiName;

View File

@ -41,7 +41,6 @@ import io.grpc.Status;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientTransport;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.NoopClientStream;
import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerStreamListener;
@ -484,12 +483,17 @@ class InProcessTransport implements ServerTransport, ClientTransport {
@Override
public void setMessageCompression(boolean enable) {}
@Override
public void setAuthority(String string) {
// TODO(ejona): Do something with this? Could be useful for testing, but can we "validate"
// it?
}
@Override
public void start(ClientStreamListener listener) {
serverStream.setListener(listener);
synchronized (InProcessTransport.this) {
headers.removeAll(GrpcUtil.AUTHORITY_KEY);
ServerStreamListener serverStreamListener = serverTransportListener.streamCreated(
serverStream, method.getFullMethodName(), headers);
clientStream.setListener(serverStreamListener);

View File

@ -35,7 +35,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_JOINER;
import static io.grpc.internal.GrpcUtil.AUTHORITY_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
@ -146,13 +145,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
@VisibleForTesting
static void prepareHeaders(Metadata headers, CallOptions callOptions, String userAgent,
DecompressorRegistry decompressorRegistry, Compressor compressor) {
// Hack to propagate authority. This should be properly pass to the transport.newStream
// somehow.
headers.removeAll(AUTHORITY_KEY);
if (callOptions.getAuthority() != null) {
headers.put(AUTHORITY_KEY, callOptions.getAuthority());
}
// Fill out the User-Agent header.
headers.removeAll(USER_AGENT_KEY);
if (userAgent != null) {
@ -238,6 +230,9 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
transportFuture.isDone() ? directExecutor() : callExecutor);
}
if (callOptions.getAuthority() != null) {
stream.setAuthority(callOptions.getAuthority());
}
stream.setCompressor(compressor);
if (compressor != Codec.Identity.NONE) {
stream.setMessageCompression(true);

View File

@ -60,6 +60,12 @@ public interface ClientStream extends Stream {
*/
void halfClose();
/**
* Override the default authority with {@code authority}. May only be called before {@link
* #start}.
*/
void setAuthority(String authority);
/**
* Starts stream. This method may only be called once. It is safe to do latent initialization of
* the stream up until {@link #start} is called.

View File

@ -60,6 +60,8 @@ class DelayedStream implements ClientStream {
// cancelled. This should be okay.
private volatile ClientStream startedRealStream;
@GuardedBy("this")
private String authority;
@GuardedBy("this")
private ClientStreamListener listener;
@GuardedBy("this")
private ClientStream realStream;
@ -90,6 +92,17 @@ class DelayedStream implements ClientStream {
}
}
@Override
public synchronized void setAuthority(String authority) {
checkState(listener == null, "must be called before start");
checkNotNull(authority, "authority");
if (realStream == null) {
this.authority = authority;
} else {
realStream.setAuthority(authority);
}
}
@Override
public void start(ClientStreamListener listener) {
synchronized (this) {

View File

@ -84,17 +84,6 @@ public final class GrpcUtil {
public static final Metadata.Key<String> MESSAGE_ACCEPT_ENCODING_KEY =
Metadata.Key.of(GrpcUtil.MESSAGE_ACCEPT_ENCODING, Metadata.ASCII_STRING_MARSHALLER);
/**
* {@link io.grpc.Metadata.Key} for the :authority pseudo header.
*
* <p> Don't actually serialized this.
*
* <p>TODO(carl-mastrangelo): This is a hack and should exist as shortly as possible. Remove it
* once a cleaner alternative exists (passing it directly into the transport, etc.)
*/
public static final Metadata.Key<String> AUTHORITY_KEY =
Metadata.Key.of("grpc-authority", Metadata.ASCII_STRING_MARSHALLER);
/**
* {@link io.grpc.Metadata.Key} for the Content-Type request/response header.
*/

View File

@ -43,6 +43,9 @@ import java.io.InputStream;
public class NoopClientStream implements ClientStream {
public static NoopClientStream INSTANCE = new NoopClientStream();
@Override
public void setAuthority(String authority) {}
@Override
public void start(ClientStreamListener listener) {}

View File

@ -282,15 +282,6 @@ public class MetadataTest {
assertNull(removed);
}
@Test
public void serializeSkipsAuthority() {
Metadata m = new Metadata();
m.put(GrpcUtil.AUTHORITY_KEY, "authority");
byte[][] values = m.serialize();
assertEquals(0, values.length);
}
@Test
public void keyEqualsHashNameWorks() {
Key<Integer> k1 = Key.of("case", Metadata.INTEGER_MARSHALLER);

View File

@ -263,6 +263,9 @@ public class AbstractClientStreamTest {
super(allocator, DEFAULT_MAX_MESSAGE_SIZE);
}
@Override
public void setAuthority(String authority) {}
@Override
public void request(int numMessages) {}

View File

@ -41,6 +41,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -169,13 +170,44 @@ public class ClientCallImplTest {
}
@Test
public void prepareHeaders_authorityAdded() {
Metadata m = new Metadata();
CallOptions callOptions = CallOptions.DEFAULT.withAuthority("auth");
ClientCallImpl.prepareHeaders(m, callOptions, "user agent", decompressorRegistry,
Codec.Identity.NONE);
public void authorityPropagatedToStream() {
final ClientTransport transport = mock(ClientTransport.class);
final ClientStream stream = mock(ClientStream.class);
when(provider.get(any(CallOptions.class))).thenReturn(Futures.immediateFuture(transport));
assertEquals(m.get(GrpcUtil.AUTHORITY_KEY), "auth");
when(transport.newStream(any(MethodDescriptor.class), any(Metadata.class))).thenReturn(stream);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
CallOptions.DEFAULT.withAuthority("overridden-authority"),
provider,
deadlineCancellationExecutor)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
verify(stream).setAuthority("overridden-authority");
}
@Test
public void authorityNotPropagatedToStream() {
final ClientTransport transport = mock(ClientTransport.class);
final ClientStream stream = mock(ClientStream.class);
when(provider.get(any(CallOptions.class))).thenReturn(Futures.immediateFuture(transport));
when(transport.newStream(any(MethodDescriptor.class), any(Metadata.class))).thenReturn(stream);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
// Don't provide an authority
CallOptions.DEFAULT,
provider,
deadlineCancellationExecutor)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
verify(stream, never()).setAuthority(any(String.class));
}
@Test
@ -247,7 +279,6 @@ public class ClientCallImplTest {
@Test
public void prepareHeaders_removeReservedHeaders() {
Metadata m = new Metadata();
m.put(GrpcUtil.AUTHORITY_KEY, "auth");
m.put(GrpcUtil.USER_AGENT_KEY, "user agent");
m.put(GrpcUtil.MESSAGE_ENCODING_KEY, "gzip");
m.put(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY, "gzip");
@ -255,7 +286,6 @@ public class ClientCallImplTest {
ClientCallImpl.prepareHeaders(m, CallOptions.DEFAULT, null,
DecompressorRegistry.newEmptyInstance(), Codec.Identity.NONE);
assertNull(m.get(GrpcUtil.AUTHORITY_KEY));
assertNull(m.get(GrpcUtil.USER_AGENT_KEY));
assertNull(m.get(GrpcUtil.MESSAGE_ENCODING_KEY));
assertNull(m.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY));

View File

@ -35,6 +35,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.Http2ClientStream;
@ -45,6 +47,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.AsciiString;
import javax.annotation.Nullable;
@ -52,30 +55,62 @@ import javax.annotation.Nullable;
* Client stream for a Netty transport.
*/
class NettyClientStream extends Http2ClientStream {
private final MethodDescriptor<?, ?> method;
/** {@code null} after start. */
private Metadata headers;
private final Channel channel;
private final NettyClientHandler handler;
private final Runnable startCallback;
private AsciiString authority;
private final AsciiString scheme;
private Http2Stream http2Stream;
private Integer id;
private WriteQueue writeQueue;
NettyClientStream(
Channel channel, NettyClientHandler handler, Runnable startCallback, int maxMessageSize) {
NettyClientStream(MethodDescriptor<?, ?> method, Metadata headers, Channel channel,
NettyClientHandler handler, int maxMessageSize, AsciiString authority, AsciiString scheme) {
super(new NettyWritableBufferAllocator(channel.alloc()), maxMessageSize);
this.method = checkNotNull(method, "method");
this.headers = checkNotNull(headers, "headers");
this.writeQueue = handler.getWriteQueue();
this.channel = checkNotNull(channel, "channel");
this.handler = checkNotNull(handler, "handler");
this.startCallback = checkNotNull(startCallback, "startCallback");
this.authority = checkNotNull(authority, "authority");
this.scheme = checkNotNull(scheme, "scheme");
}
@Override
public void setAuthority(String authority) {
checkState(listener() == null, "must be call before start");
this.authority = AsciiString.of(checkNotNull(authority, "authority"));
}
@Override
public void start(ClientStreamListener listener) {
super.start(listener);
startCallback.run();
// Convert the headers into Netty HTTP/2 headers.
AsciiString defaultPath = new AsciiString("/" + method.getFullMethodName());
Http2Headers http2Headers
= Utils.convertClientHeaders(headers, scheme, defaultPath, authority);
headers = null;
ChannelFutureListener failureListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// Stream creation failed. Close the stream if not already closed.
transportReportStatus(Utils.statusFromThrowable(future.cause()), true, new Metadata());
}
}
};
// Write the command requesting the creation of the stream.
writeQueue.enqueue(new CreateStreamCommand(http2Headers, this),
!method.getType().clientSendsOneMessage()).addListener(failureListener);
}
@Override
public void request(final int numMessages) {
public void request(int numMessages) {
if (channel.eventLoop().inEventLoop()) {
// Processing data read in the event loop so can call into the deframer immediately
requestMessagesFromDeframer(numMessages);

View File

@ -31,7 +31,6 @@
package io.grpc.netty;
import static io.grpc.internal.GrpcUtil.AUTHORITY_KEY;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
import com.google.common.base.Preconditions;
@ -49,7 +48,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.AsciiString;
import java.net.SocketAddress;
@ -103,42 +101,11 @@ class NettyClientTransport implements ClientTransport {
}
@Override
public ClientStream newStream(final MethodDescriptor<?, ?> method, final Metadata headers) {
public ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers) {
Preconditions.checkNotNull(method, "method");
Preconditions.checkNotNull(headers, "headers");
// Convert the headers into Netty HTTP/2 headers.
AsciiString defaultPath = new AsciiString("/" + method.getFullMethodName());
AsciiString defaultAuthority = new AsciiString(headers.containsKey(AUTHORITY_KEY)
? headers.get(AUTHORITY_KEY) : authority);
headers.removeAll(AUTHORITY_KEY);
final Http2Headers http2Headers = Utils.convertClientHeaders(
headers, negotiationHandler.scheme(), defaultPath, defaultAuthority);
class StartCallback implements Runnable {
final NettyClientStream clientStream =
new NettyClientStream(channel, handler, this, maxMessageSize);
final ChannelFutureListener failureListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// Stream creation failed. Close the stream if not already closed.
clientStream.transportReportStatus(Utils.statusFromThrowable(future.cause()), true,
new Metadata());
}
}
};
@Override
public void run() {
// Write the command requesting the creation of the stream.
handler.getWriteQueue().enqueue(new CreateStreamCommand(http2Headers, clientStream),
!method.getType().clientSendsOneMessage()).addListener(failureListener);
}
}
return new StartCallback().clientStream;
return new NettyClientStream(method, headers, channel, handler, maxMessageSize, authority,
negotiationHandler.scheme());
}
@Override

View File

@ -31,7 +31,6 @@
package io.grpc.netty;
import static io.grpc.internal.GrpcUtil.AUTHORITY_KEY;
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;
import static io.netty.util.CharsetUtil.UTF_8;
@ -109,25 +108,20 @@ class Utils {
public static Http2Headers convertClientHeaders(Metadata headers,
AsciiString scheme,
AsciiString defaultPath,
AsciiString defaultAuthority) {
AsciiString authority) {
Preconditions.checkNotNull(defaultPath, "defaultPath");
Preconditions.checkNotNull(defaultAuthority, "defaultAuthority");
Preconditions.checkNotNull(authority, "authority");
// Add any application-provided headers first.
Http2Headers http2Headers = convertMetadata(headers);
// Now set GRPC-specific default headers.
http2Headers.authority(defaultAuthority)
http2Headers.authority(authority)
.path(defaultPath)
.method(HTTP_METHOD)
.scheme(scheme)
.set(CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC)
.set(TE_HEADER, TE_TRAILERS);
// Override the default authority and path if provided by the headers.
if (headers.containsKey(AUTHORITY_KEY)) {
http2Headers.authority(new AsciiString(headers.get(AUTHORITY_KEY).getBytes(UTF_8)));
}
// Set the User-Agent header.
String userAgent = GrpcUtil.getGrpcUserAgent("netty", headers.get(USER_AGENT_KEY));
http2Headers.set(USER_AGENT, new AsciiString(userAgent.getBytes(UTF_8)));

View File

@ -53,6 +53,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.ClientStreamListener;
import io.netty.buffer.ByteBuf;
@ -85,6 +86,13 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
@Mock
protected NettyClientHandler handler;
@SuppressWarnings("unchecked")
private MethodDescriptor.Marshaller<Void> marshaller = mock(MethodDescriptor.Marshaller.class);
// Must be initialized before @Before, because it is used by createStream()
private MethodDescriptor<?, ?> methodDescriptor = MethodDescriptor.create(
MethodDescriptor.MethodType.UNARY, "/testService/test", marshaller, marshaller);
@Override
protected ClientStreamListener listener() {
return listener;
@ -346,12 +354,9 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
@Test
public void setHttp2StreamShouldNotifyReady() {
listener = mock(ClientStreamListener.class);
Runnable starter = new Runnable() {
@Override
public void run() {}
};
stream = new NettyClientStream(channel, handler, starter, DEFAULT_MAX_MESSAGE_SIZE);
stream = new NettyClientStream(methodDescriptor, new Metadata(), channel, handler,
DEFAULT_MAX_MESSAGE_SIZE, AsciiString.of("localhost"), AsciiString.of("http"));
stream.start(listener);
stream().id(STREAM_ID);
verify(listener, never()).onReady();
@ -374,12 +379,8 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
}
}).when(writeQueue).enqueue(any(), any(ChannelPromise.class), anyBoolean());
when(writeQueue.enqueue(any(), anyBoolean())).thenReturn(future);
Runnable starter = new Runnable() {
@Override
public void run() {}
};
NettyClientStream stream =
new NettyClientStream(channel, handler, starter, DEFAULT_MAX_MESSAGE_SIZE);
NettyClientStream stream = new NettyClientStream(methodDescriptor, new Metadata(), channel,
handler, DEFAULT_MAX_MESSAGE_SIZE, AsciiString.of("localhost"), AsciiString.of("http"));
stream.start(listener);
assertTrue(stream.canSend());
assertTrue(stream.canReceive());

View File

@ -31,7 +31,6 @@
package io.grpc.okhttp;
import static io.grpc.internal.GrpcUtil.AUTHORITY_KEY;
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;
@ -64,10 +63,10 @@ public class Headers {
* application thread context.
*/
public static List<Header> createRequestHeaders(Metadata headers, String defaultPath,
String defaultAuthority) {
String authority) {
Preconditions.checkNotNull(headers, "headers");
Preconditions.checkNotNull(defaultPath, "defaultPath");
Preconditions.checkNotNull(defaultAuthority, "defaultAuthority");
Preconditions.checkNotNull(authority, "authority");
List<Header> okhttpHeaders = new ArrayList<Header>(6);
@ -75,9 +74,6 @@ public class Headers {
okhttpHeaders.add(SCHEME_HEADER);
okhttpHeaders.add(METHOD_HEADER);
String authority = headers.containsKey(AUTHORITY_KEY)
? headers.get(AUTHORITY_KEY) : defaultAuthority;
headers.removeAll(AUTHORITY_KEY);
okhttpHeaders.add(new Header(Header.TARGET_AUTHORITY, authority));
String path = defaultPath;
okhttpHeaders.add(new Header(Header.TARGET_PATH, path));

View File

@ -35,7 +35,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.Http2ClientStream;
@ -61,17 +62,18 @@ class OkHttpClientStream extends Http2ClientStream {
private static final Buffer EMPTY_BUFFER = new Buffer();
private final MethodType type;
@GuardedBy("lock")
private int window = Utils.DEFAULT_WINDOW_SIZE;
@GuardedBy("lock")
private int processedWindow = Utils.DEFAULT_WINDOW_SIZE;
private final MethodDescriptor<?, ?> method;
/** {@code null} iff start has been called. */
private Metadata headers;
private final AsyncFrameWriter frameWriter;
private final OutboundFlowController outboundFlow;
private final OkHttpClientTransport transport;
private final Runnable startCallback;
private final Object lock;
private String authority;
private Object outboundFlowState;
private volatile Integer id;
@GuardedBy("lock")
@ -86,29 +88,29 @@ class OkHttpClientStream extends Http2ClientStream {
private boolean cancelSent = false;
OkHttpClientStream(
MethodDescriptor<?, ?> method,
Metadata headers,
AsyncFrameWriter frameWriter,
OkHttpClientTransport transport,
Runnable startCallback,
OutboundFlowController outboundFlow,
MethodType type,
Object lock,
List<Header> requestHeaders,
int maxMessageSize) {
int maxMessageSize,
String authority) {
super(new OkHttpWritableBufferAllocator(), maxMessageSize);
this.method = method;
this.headers = headers;
this.frameWriter = frameWriter;
this.transport = transport;
this.startCallback = startCallback;
this.outboundFlow = outboundFlow;
this.type = type;
this.lock = lock;
this.requestHeaders = requestHeaders;
this.authority = authority;
}
/**
* Returns the type of this stream.
*/
public MethodType getType() {
return type;
public MethodDescriptor.MethodType getType() {
return method.getType();
}
@Override
@ -124,10 +126,22 @@ class OkHttpClientStream extends Http2ClientStream {
return id;
}
@Override
public void setAuthority(String authority) {
checkState(listener() == null, "must be call before start");
this.authority = checkNotNull(authority, "authority");
}
@Override
public void start(ClientStreamListener listener) {
super.start(listener);
startCallback.run();
String defaultPath = "/" + method.getFullMethodName();
List<Header> requestHeaders = Headers.createRequestHeaders(headers, defaultPath, authority);
headers = null;
synchronized (lock) {
this.requestHeaders = requestHeaders;
transport.streamReadyToStart(this);
}
}
@GuardedBy("lock")

View File

@ -245,29 +245,21 @@ class OkHttpClientTransport implements ClientTransport {
public OkHttpClientStream newStream(final MethodDescriptor<?, ?> method, final Metadata headers) {
Preconditions.checkNotNull(method, "method");
Preconditions.checkNotNull(headers, "headers");
return new OkHttpClientStream(method, headers, frameWriter, OkHttpClientTransport.this,
outboundFlow, lock, maxMessageSize, defaultAuthority);
}
final String defaultPath = "/" + method.getFullMethodName();
class StartCallback implements Runnable {
final OkHttpClientStream clientStream = new OkHttpClientStream(
frameWriter, OkHttpClientTransport.this, this, outboundFlow, method.getType(), lock,
Headers.createRequestHeaders(headers, defaultPath, defaultAuthority),
maxMessageSize);
@Override
public void run() {
synchronized (lock) {
if (goAway) {
clientStream.transportReportStatus(goAwayStatus, true, new Metadata());
} else if (streams.size() >= maxConcurrentStreams) {
pendingStreams.add(clientStream);
} else {
startStream(clientStream);
}
}
@GuardedBy("lock")
void streamReadyToStart(OkHttpClientStream clientStream) {
synchronized (lock) {
if (goAway) {
clientStream.transportReportStatus(goAwayStatus, true, new Metadata());
} else if (streams.size() >= maxConcurrentStreams) {
pendingStreams.add(clientStream);
} else {
startStream(clientStream);
}
}
return new StartCallback().clientStream;
}
@GuardedBy("lock")

View File

@ -37,11 +37,11 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.internal.ClientStreamListener;
import io.grpc.okhttp.internal.framed.ErrorCode;
import io.grpc.okhttp.internal.framed.Header;
import org.junit.Before;
import org.junit.Test;
@ -54,28 +54,28 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@RunWith(JUnit4.class)
public class OkHttpClientStreamTest {
private static final int MAX_MESSAGE_SIZE = 100;
@Mock private MethodDescriptor.Marshaller<Void> marshaller;
@Mock private AsyncFrameWriter frameWriter;
@Mock private OkHttpClientTransport transport;
@Mock private Runnable startCallback;
@Mock private OutboundFlowController flowController;
private final Object lock = new Object();
private final List<Header> requestHeaders = new ArrayList<Header>();
private MethodDescriptor<?, ?> methodDescriptor;
private OkHttpClientStream stream;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
stream = new OkHttpClientStream(frameWriter, transport, startCallback, flowController,
MethodType.UNARY, lock, requestHeaders, MAX_MESSAGE_SIZE);
methodDescriptor = MethodDescriptor.create(
MethodType.UNARY, "/testService/test", marshaller, marshaller);
stream = new OkHttpClientStream(methodDescriptor, new Metadata(), frameWriter, transport,
flowController, lock, MAX_MESSAGE_SIZE, "localhost");
}
@Test