mirror of https://github.com/grpc/grpc-java.git
Fix bug where the stream id would not get incremented for buffered streams.
This commit is contained in:
parent
9d214637e6
commit
e8afa3ca23
|
@ -68,6 +68,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
|
|||
private Throwable connectionError;
|
||||
private Status goAwayStatus;
|
||||
private ChannelHandlerContext ctx;
|
||||
private int nextStreamId;
|
||||
|
||||
public NettyClientHandler(Http2ConnectionEncoder encoder, Http2Connection connection,
|
||||
Http2FrameReader frameReader, Http2LocalFlowController inboundFlow,
|
||||
|
@ -84,6 +85,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
|
|||
// Disallow stream creation by the server.
|
||||
connection.remote().maxActiveStreams(0);
|
||||
connection.local().allowPushTo(false);
|
||||
nextStreamId = connection.local().nextStreamId();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
@ -232,7 +234,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
|
|||
*/
|
||||
private void createStream(CreateStreamCommand command, final ChannelPromise promise) {
|
||||
final Http2Connection.Endpoint<Http2LocalFlowController> local = connection().local();
|
||||
final int streamId = local.nextStreamId();
|
||||
final int streamId = getAndIncrementNextStreamId();
|
||||
final NettyClientStream stream = command.stream();
|
||||
final Http2Headers headers = command.headers();
|
||||
// TODO: Send GO_AWAY if streamId overflows
|
||||
|
@ -339,6 +341,12 @@ class NettyClientHandler extends Http2ConnectionHandler {
|
|||
return connection().activeStreams().toArray(new Http2Stream[0]);
|
||||
}
|
||||
|
||||
private int getAndIncrementNextStreamId() {
|
||||
int id = nextStreamId;
|
||||
nextStreamId += 2;
|
||||
return id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the connection window if we haven't already.
|
||||
*/
|
||||
|
|
|
@ -359,6 +359,17 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
|
|||
handler.decoder().flowController().windowSize(handler.connection().connectionStream()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createIncrementsIdsForActualAndBufferdStreams() throws Exception {
|
||||
receiveMaxConcurrentStreams(2);
|
||||
handler.write(ctx, new CreateStreamCommand(grpcHeaders, stream), promise);
|
||||
verify(stream).id(eq(3));
|
||||
handler.write(ctx, new CreateStreamCommand(grpcHeaders, stream), promise);
|
||||
verify(stream).id(eq(5));
|
||||
handler.write(ctx, new CreateStreamCommand(grpcHeaders, stream), promise);
|
||||
verify(stream).id(eq(7));
|
||||
}
|
||||
|
||||
private void receiveMaxConcurrentStreams(int max) throws Exception {
|
||||
ByteBuf serializedSettings = serializeSettings(new Http2Settings().maxConcurrentStreams(max));
|
||||
handler.channelRead(ctx, serializedSettings);
|
||||
|
|
Loading…
Reference in New Issue