diff --git a/benchmarks/src/jmh/java/io/grpc/netty/InboundHeadersBenchmark.java b/benchmarks/src/jmh/java/io/grpc/netty/InboundHeadersBenchmark.java index 66c0accd01..007787b2b5 100644 --- a/benchmarks/src/jmh/java/io/grpc/netty/InboundHeadersBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/netty/InboundHeadersBenchmark.java @@ -35,8 +35,8 @@ import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER; import static io.grpc.netty.Utils.TE_TRAILERS; import static io.netty.util.AsciiString.of; -import io.grpc.netty.GrpcHttp2HeadersDecoder.GrpcHttp2RequestHeaders; -import io.grpc.netty.GrpcHttp2HeadersDecoder.GrpcHttp2ResponseHeaders; +import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2RequestHeaders; +import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ResponseHeaders; import io.netty.handler.codec.http2.DefaultHttp2Headers; import io.netty.handler.codec.http2.Http2Headers; import io.netty.util.AsciiString; diff --git a/build.gradle b/build.gradle index 93ac118169..54a4c94b4e 100644 --- a/build.gradle +++ b/build.gradle @@ -86,6 +86,7 @@ subprojects { protocPluginBaseName = 'protoc-gen-grpc-java' javaPluginPath = "$rootDir/compiler/build/exe/java_plugin/$protocPluginBaseName$exeSuffix" + nettyVersion = '4.1.11.Final' guavaVersion = '19.0' protobufVersion = '3.2.0' protobufNanoVersion = '3.0.0-alpha-5' @@ -191,10 +192,10 @@ subprojects { protobuf_plugin: 'com.google.protobuf:protobuf-gradle-plugin:0.8.0', protobuf_util: "com.google.protobuf:protobuf-java-util:${protobufVersion}", - netty: 'io.netty:netty-codec-http2:[4.1.8.Final]', - netty_epoll: 'io.netty:netty-transport-native-epoll:4.1.8.Final' + epoll_suffix, - netty_proxy_handler: 'io.netty:netty-handler-proxy:4.1.8.Final', - netty_tcnative: 'io.netty:netty-tcnative-boringssl-static:1.1.33.Fork26', + netty: "io.netty:netty-codec-http2:[${nettyVersion}]", + netty_epoll: "io.netty:netty-transport-native-epoll:${nettyVersion}" + epoll_suffix, + netty_proxy_handler: "io.netty:netty-handler-proxy:${nettyVersion}", + netty_tcnative: 'io.netty:netty-tcnative-boringssl-static:2.0.1.Final', // Test dependencies. junit: 'junit:junit:4.11', diff --git a/netty/src/main/java/io/grpc/netty/GrpcHttp2HeadersDecoder.java b/netty/src/main/java/io/grpc/netty/GrpcHttp2HeadersUtils.java similarity index 85% rename from netty/src/main/java/io/grpc/netty/GrpcHttp2HeadersDecoder.java rename to netty/src/main/java/io/grpc/netty/GrpcHttp2HeadersUtils.java index e3183d1e74..ec07bd8266 100644 --- a/netty/src/main/java/io/grpc/netty/GrpcHttp2HeadersDecoder.java +++ b/netty/src/main/java/io/grpc/netty/GrpcHttp2HeadersUtils.java @@ -55,12 +55,8 @@ import static io.netty.util.AsciiString.isUpperCase; import com.google.common.io.BaseEncoding; import io.grpc.Metadata; -import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder; -import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2Headers; -import io.netty.handler.codec.http2.Http2HeadersDecoder; -import io.netty.handler.codec.http2.internal.hpack.Decoder; import io.netty.util.AsciiString; import io.netty.util.internal.PlatformDependent; import java.util.ArrayList; @@ -68,88 +64,30 @@ import java.util.Collections; import java.util.List; /** - * A {@link Http2HeadersDecoder} that allows to use custom {@link Http2Headers} implementations. - * - *

Some of the code is copied from Netty's {@link DefaultHttp2HeadersDecoder}. + * A headers utils providing custom gRPC implementations of {@link DefaultHttp2HeadersDecoder}. */ -abstract class GrpcHttp2HeadersDecoder implements Http2HeadersDecoder, - Http2HeadersDecoder.Configuration { - - private static final float HEADERS_COUNT_WEIGHT_NEW = 1 / 5f; - private static final float HEADERS_COUNT_WEIGHT_HISTORICAL = 1 - HEADERS_COUNT_WEIGHT_NEW; - - private final Decoder decoder; - private float numHeadersGuess = 8; - - GrpcHttp2HeadersDecoder(long maxHeaderListSize) { - decoder = new Decoder(maxHeaderListSize, 32 /* same as default */); - } - - @Override - public Http2Headers decodeHeaders(int streamId, ByteBuf headerBlock) throws Http2Exception { - GrpcHttp2InboundHeaders headers = newHeaders(1 + (int) numHeadersGuess); - decoder.decode(streamId, headerBlock, headers); - - numHeadersGuess = HEADERS_COUNT_WEIGHT_NEW * headers.numHeaders() - + HEADERS_COUNT_WEIGHT_HISTORICAL * numHeadersGuess; - - return headers; - } - - abstract GrpcHttp2InboundHeaders newHeaders(int numHeadersGuess); - - @Override - public Configuration configuration() { - return this; - } - - @Override - public long maxHeaderListSize() { - return decoder.getMaxHeaderListSize(); - } - - @Override - public void maxHeaderListSize(long maxHeaderListSize, long maxHeaderListSizeGoAway) - throws Http2Exception { - decoder.setMaxHeaderListSize(maxHeaderListSize, maxHeaderListSizeGoAway); - } - - @Override - public long maxHeaderListSizeGoAway() { - return decoder.getMaxHeaderListSizeGoAway(); - } - - @Override - public long maxHeaderTableSize() { - return decoder.getMaxHeaderTableSize(); - } - - @Override - public void maxHeaderTableSize(long max) throws Http2Exception { - decoder.setMaxHeaderTableSize(max); - } - - static final class GrpcHttp2ServerHeadersDecoder extends GrpcHttp2HeadersDecoder { +class GrpcHttp2HeadersUtils { + static final class GrpcHttp2ServerHeadersDecoder extends DefaultHttp2HeadersDecoder { GrpcHttp2ServerHeadersDecoder(long maxHeaderListSize) { - super(maxHeaderListSize); + super(true, maxHeaderListSize); } @Override - GrpcHttp2InboundHeaders newHeaders(int numHeadersGuess) { - return new GrpcHttp2RequestHeaders(numHeadersGuess); + protected GrpcHttp2InboundHeaders newHeaders() { + return new GrpcHttp2RequestHeaders(numberOfHeadersGuess()); } } - static final class GrpcHttp2ClientHeadersDecoder extends GrpcHttp2HeadersDecoder { + static final class GrpcHttp2ClientHeadersDecoder extends DefaultHttp2HeadersDecoder { GrpcHttp2ClientHeadersDecoder(long maxHeaderListSize) { - super(maxHeaderListSize); + super(true, maxHeaderListSize); } @Override - GrpcHttp2InboundHeaders newHeaders(int numHeadersGuess) { - return new GrpcHttp2ResponseHeaders(numHeadersGuess); + protected GrpcHttp2InboundHeaders newHeaders() { + return new GrpcHttp2ResponseHeaders(numberOfHeadersGuess()); } } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 9b3487c78f..d41daecd54 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -46,7 +46,7 @@ import io.grpc.internal.ClientTransport.PingCallback; import io.grpc.internal.GrpcUtil; import io.grpc.internal.Http2Ping; import io.grpc.internal.KeepAliveManager; -import io.grpc.netty.GrpcHttp2HeadersDecoder.GrpcHttp2ClientHeadersDecoder; +import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -56,6 +56,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.DefaultHttp2Connection; +import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; @@ -149,8 +150,7 @@ class NettyClientHandler extends AbstractNettyHandler { connection.local().flowController( new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true)); - // TODO(ejona): swap back to DefaultHttp2Connection with Netty-4.1.9 - Http2ConnectionDecoder decoder = new FixedHttp2ConnectionDecoder(connection, encoder, + Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader); Http2Settings settings = new Http2Settings(); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 14aa8c1be1..4cb787dee5 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -55,7 +55,7 @@ import io.grpc.internal.KeepAliveManager; import io.grpc.internal.LogExceptionRunnable; import io.grpc.internal.ServerTransportListener; import io.grpc.internal.StatsTraceContext; -import io.grpc.netty.GrpcHttp2HeadersDecoder.GrpcHttp2ServerHeadersDecoder; +import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelFuture; @@ -64,6 +64,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2Connection; +import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; @@ -188,8 +189,7 @@ class NettyServerHandler extends AbstractNettyHandler { frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer); Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter); - // TODO(ejona): swap back to DefaultHttp2Connection with Netty-4.1.9 - Http2ConnectionDecoder decoder = new FixedHttp2ConnectionDecoder(connection, encoder, + Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader); Http2Settings settings = new Http2Settings(); diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java index bc30e2a252..b4a7358915 100644 --- a/netty/src/main/java/io/grpc/netty/Utils.java +++ b/netty/src/main/java/io/grpc/netty/Utils.java @@ -44,7 +44,7 @@ import io.grpc.Metadata; import io.grpc.Status; import io.grpc.internal.GrpcUtil; import io.grpc.internal.SharedResourceHolder.Resource; -import io.grpc.netty.GrpcHttp2HeadersDecoder.GrpcHttp2InboundHeaders; +import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2InboundHeaders; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.codec.http2.Http2Exception; diff --git a/netty/src/test/java/io/grpc/netty/GrpcHttp2HeadersDecoderTest.java b/netty/src/test/java/io/grpc/netty/GrpcHttp2HeadersUtilsTest.java similarity index 96% rename from netty/src/test/java/io/grpc/netty/GrpcHttp2HeadersDecoderTest.java rename to netty/src/test/java/io/grpc/netty/GrpcHttp2HeadersUtilsTest.java index 72304c6b92..7935dc73b5 100644 --- a/netty/src/test/java/io/grpc/netty/GrpcHttp2HeadersDecoderTest.java +++ b/netty/src/test/java/io/grpc/netty/GrpcHttp2HeadersUtilsTest.java @@ -37,8 +37,8 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import io.grpc.netty.GrpcHttp2HeadersDecoder.GrpcHttp2ClientHeadersDecoder; -import io.grpc.netty.GrpcHttp2HeadersDecoder.GrpcHttp2ServerHeadersDecoder; +import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder; +import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http2.DefaultHttp2Headers; @@ -54,10 +54,10 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** - * Tests for {@link GrpcHttp2HeadersDecoder}. + * Tests for {@link GrpcHttp2HeadersUtils}. */ @RunWith(JUnit4.class) -public class GrpcHttp2HeadersDecoderTest { +public class GrpcHttp2HeadersUtilsTest { private static final SensitivityDetector NEVER_SENSITIVE = new SensitivityDetector() { @Override diff --git a/netty/src/test/java/io/grpc/netty/GrpcHttp2InboundHeadersTest.java b/netty/src/test/java/io/grpc/netty/GrpcHttp2InboundHeadersTest.java index 971c23a935..b60b74e4c0 100644 --- a/netty/src/test/java/io/grpc/netty/GrpcHttp2InboundHeadersTest.java +++ b/netty/src/test/java/io/grpc/netty/GrpcHttp2InboundHeadersTest.java @@ -37,9 +37,9 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import com.google.common.io.BaseEncoding; -import io.grpc.netty.GrpcHttp2HeadersDecoder.GrpcHttp2InboundHeaders; -import io.grpc.netty.GrpcHttp2HeadersDecoder.GrpcHttp2RequestHeaders; -import io.grpc.netty.GrpcHttp2HeadersDecoder.GrpcHttp2ResponseHeaders; +import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2InboundHeaders; +import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2RequestHeaders; +import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ResponseHeaders; import io.netty.handler.codec.http2.Http2Headers; import io.netty.util.AsciiString; import java.util.Random; diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index d2d160a1b3..94aa60a876 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -70,7 +70,7 @@ import io.grpc.internal.ClientTransport.PingCallback; import io.grpc.internal.GrpcUtil; import io.grpc.internal.KeepAliveManager; import io.grpc.internal.StatsTraceContext; -import io.grpc.netty.GrpcHttp2HeadersDecoder.GrpcHttp2ClientHeadersDecoder; +import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; diff --git a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java index 6161c4a58b..0b8126e69e 100644 --- a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java +++ b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java @@ -62,6 +62,7 @@ import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2FrameReader; import io.netty.handler.codec.http2.Http2FrameWriter; import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2HeadersDecoder; import io.netty.handler.codec.http2.Http2LocalFlowController; import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.Http2Stream; @@ -101,7 +102,7 @@ public abstract class NettyHandlerTestBase { /** * Must be called by subclasses to initialize the handler and channel. */ - protected final void initChannel(GrpcHttp2HeadersDecoder headersDecoder) throws Exception { + protected final void initChannel(Http2HeadersDecoder headersDecoder) throws Exception { content = Unpooled.copiedBuffer("hello world", UTF_8); frameWriter = spy(new DefaultHttp2FrameWriter()); frameReader = new DefaultHttp2FrameReader(headersDecoder); diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index be36c4bc13..2ddc394a5d 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -79,7 +79,7 @@ import io.grpc.internal.ServerStream; import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerTransportListener; import io.grpc.internal.StatsTraceContext; -import io.grpc.netty.GrpcHttp2HeadersDecoder.GrpcHttp2ServerHeadersDecoder; +import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; diff --git a/netty/third_party/netty/java/io/grpc/netty/FixedHttp2ConnectionDecoder.java b/netty/third_party/netty/java/io/grpc/netty/FixedHttp2ConnectionDecoder.java deleted file mode 100644 index fb50245edf..0000000000 --- a/netty/third_party/netty/java/io/grpc/netty/FixedHttp2ConnectionDecoder.java +++ /dev/null @@ -1,711 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package io.grpc.netty; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.http2.*; -import io.netty.handler.codec.http2.Http2Connection.Endpoint; -import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException; -import io.netty.util.internal.UnstableApi; -import io.netty.util.internal.logging.InternalLogger; -import io.netty.util.internal.logging.InternalLoggerFactory; - -import java.util.List; - -import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; -import static io.netty.handler.codec.http2.Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS; -import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; -import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; -import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED; -import static io.netty.handler.codec.http2.Http2Exception.connectionError; -import static io.netty.handler.codec.http2.Http2Exception.streamError; -import static io.netty.handler.codec.http2.Http2PromisedRequestVerifier.ALWAYS_VERIFY; -import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED; -import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE; -import static io.netty.util.internal.ObjectUtil.checkNotNull; -import static java.lang.Integer.MAX_VALUE; -import static java.lang.Math.min; - -/** - * Provides the default implementation for processing inbound frame events and delegates to a - * {@link Http2FrameListener} - *

- * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener} - *

- * This interface enforces inbound flow control functionality through - * {@link Http2LocalFlowController} - */ -@UnstableApi -class FixedHttp2ConnectionDecoder implements Http2ConnectionDecoder { - private static final InternalLogger logger = InternalLoggerFactory.getInstance(FixedHttp2ConnectionDecoder.class); - private Http2FrameListener internalFrameListener = new PrefaceFrameListener(); - private final Http2Connection connection; - private Http2LifecycleManager lifecycleManager; - private final Http2ConnectionEncoder encoder; - private final Http2FrameReader frameReader; - private Http2FrameListener listener; - private final Http2PromisedRequestVerifier requestVerifier; - - public FixedHttp2ConnectionDecoder(Http2Connection connection, - Http2ConnectionEncoder encoder, - Http2FrameReader frameReader) { - this(connection, encoder, frameReader, ALWAYS_VERIFY); - } - - public FixedHttp2ConnectionDecoder(Http2Connection connection, - Http2ConnectionEncoder encoder, - Http2FrameReader frameReader, - Http2PromisedRequestVerifier requestVerifier) { - this.connection = checkNotNull(connection, "connection"); - this.frameReader = checkNotNull(frameReader, "frameReader"); - this.encoder = checkNotNull(encoder, "encoder"); - this.requestVerifier = checkNotNull(requestVerifier, "requestVerifier"); - if (connection.local().flowController() == null) { - connection.local().flowController(new DefaultHttp2LocalFlowController(connection)); - } - connection.local().flowController().frameWriter(encoder.frameWriter()); - } - - @Override - public void lifecycleManager(Http2LifecycleManager lifecycleManager) { - this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager"); - } - - @Override - public Http2Connection connection() { - return connection; - } - - @Override - public final Http2LocalFlowController flowController() { - return connection.local().flowController(); - } - - @Override - public void frameListener(Http2FrameListener listener) { - this.listener = checkNotNull(listener, "listener"); - } - - @Override - public Http2FrameListener frameListener() { - return listener; - } - - // Visible for testing - Http2FrameListener internalFrameListener() { - return internalFrameListener; - } - - @Override - public boolean prefaceReceived() { - return FrameReadListener.class == internalFrameListener.getClass(); - } - - @Override - public void decodeFrame(ChannelHandlerContext ctx, ByteBuf in, List out) throws Http2Exception { - frameReader.readFrame(ctx, in, internalFrameListener); - } - - @Override - public Http2Settings localSettings() { - Http2Settings settings = new Http2Settings(); - Http2FrameReader.Configuration config = frameReader.configuration(); - Http2HeadersDecoder.Configuration headersConfig = config.headersConfiguration(); - Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy(); - settings.initialWindowSize(flowController().initialWindowSize()); - settings.maxConcurrentStreams(connection.remote().maxActiveStreams()); - settings.headerTableSize(headersConfig.maxHeaderTableSize()); - settings.maxFrameSize(frameSizePolicy.maxFrameSize()); - settings.maxHeaderListSize(headersConfig.maxHeaderListSize()); - if (!connection.isServer()) { - // Only set the pushEnabled flag if this is a client endpoint. - settings.pushEnabled(connection.local().allowPushTo()); - } - return settings; - } - - @Override - public void close() { - frameReader.close(); - } - - /** - * Calculate the threshold in bytes which should trigger a {@code GO_AWAY} if a set of headers exceeds this amount. - * @param maxHeaderListSize - * SETTINGS_MAX_HEADER_LIST_SIZE for the local - * endpoint. - * @return the threshold in bytes which should trigger a {@code GO_AWAY} if a set of headers exceeds this amount. - */ - protected long calculateMaxHeaderListSizeGoAway(long maxHeaderListSize) { - return Http2CodecUtil.calculateMaxHeaderListSizeGoAway(maxHeaderListSize); - } - - private int unconsumedBytes(Http2Stream stream) { - return flowController().unconsumedBytes(stream); - } - - void onGoAwayRead0(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) - throws Http2Exception { - if (connection.goAwayReceived() && connection.local().lastStreamKnownByPeer() < lastStreamId) { - throw connectionError(PROTOCOL_ERROR, "lastStreamId MUST NOT increase. Current value: %d new value: %d", - connection.local().lastStreamKnownByPeer(), lastStreamId); - } - listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData); - connection.goAwayReceived(lastStreamId, errorCode, debugData); - } - - void onUnknownFrame0(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, - ByteBuf payload) throws Http2Exception { - listener.onUnknownFrame(ctx, frameType, streamId, flags, payload); - } - - /** - * Handles all inbound frames from the network. - */ - private final class FrameReadListener implements Http2FrameListener { - @Override - public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, - boolean endOfStream) throws Http2Exception { - Http2Stream stream = connection.stream(streamId); - Http2LocalFlowController flowController = flowController(); - int bytesToReturn = data.readableBytes() + padding; - - final boolean shouldIgnore; - try { - shouldIgnore = shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, "DATA"); - } catch (Http2Exception e) { - // Ignoring this frame. We still need to count the frame towards the connection flow control - // window, but we immediately mark all bytes as consumed. - flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream); - flowController.consumeBytes(stream, bytesToReturn); - throw e; - } catch (Throwable t) { - throw connectionError(INTERNAL_ERROR, t, "Unhandled error on data stream id %d", streamId); - } - - if (shouldIgnore) { - // Ignoring this frame. We still need to count the frame towards the connection flow control - // window, but we immediately mark all bytes as consumed. - flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream); - flowController.consumeBytes(stream, bytesToReturn); - - // Verify that the stream may have existed after we apply flow control. - verifyStreamMayHaveExisted(streamId); - - // All bytes have been consumed. - return bytesToReturn; - } - - Http2Exception error = null; - switch (stream.state()) { - case OPEN: - case HALF_CLOSED_LOCAL: - break; - case HALF_CLOSED_REMOTE: - case CLOSED: - error = streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s", - stream.id(), stream.state()); - break; - default: - error = streamError(stream.id(), PROTOCOL_ERROR, - "Stream %d in unexpected state: %s", stream.id(), stream.state()); - break; - } - - int unconsumedBytes = unconsumedBytes(stream); - try { - flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream); - // Update the unconsumed bytes after flow control is applied. - unconsumedBytes = unconsumedBytes(stream); - - // If the stream is in an invalid state to receive the frame, throw the error. - if (error != null) { - throw error; - } - - // Call back the application and retrieve the number of bytes that have been - // immediately processed. - bytesToReturn = listener.onDataRead(ctx, streamId, data, padding, endOfStream); - return bytesToReturn; - } catch (Http2Exception e) { - // If an exception happened during delivery, the listener may have returned part - // of the bytes before the error occurred. If that's the case, subtract that from - // the total processed bytes so that we don't return too many bytes. - int delta = unconsumedBytes - unconsumedBytes(stream); - bytesToReturn -= delta; - throw e; - } catch (RuntimeException e) { - // If an exception happened during delivery, the listener may have returned part - // of the bytes before the error occurred. If that's the case, subtract that from - // the total processed bytes so that we don't return too many bytes. - int delta = unconsumedBytes - unconsumedBytes(stream); - bytesToReturn -= delta; - throw e; - } finally { - // If appropriate, return the processed bytes to the flow controller. - flowController.consumeBytes(stream, bytesToReturn); - - if (endOfStream) { - lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture()); - } - } - } - - @Override - public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, - boolean endOfStream) throws Http2Exception { - onHeadersRead(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endOfStream); - } - - @Override - public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, - short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception { - Http2Stream stream = connection.stream(streamId); - boolean allowHalfClosedRemote = false; - if (stream == null && !connection.streamMayHaveExisted(streamId)) { - stream = connection.remote().createStream(streamId, endOfStream); - // Allow the state to be HALF_CLOSE_REMOTE if we're creating it in that state. - allowHalfClosedRemote = stream.state() == HALF_CLOSED_REMOTE; - } - - if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, "HEADERS")) { - return; - } - - switch (stream.state()) { - case RESERVED_REMOTE: - stream.open(endOfStream); - break; - case OPEN: - case HALF_CLOSED_LOCAL: - // Allowed to receive headers in these states. - break; - case HALF_CLOSED_REMOTE: - if (!allowHalfClosedRemote) { - throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s", - stream.id(), stream.state()); - } - break; - case CLOSED: - throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s", - stream.id(), stream.state()); - default: - // Connection error. - throw connectionError(PROTOCOL_ERROR, "Stream %d in unexpected state: %s", stream.id(), - stream.state()); - } - - try { - // This call will create a stream for streamDependency if necessary. - // For this reason it must be done before notifying the listener. - stream.setPriority(streamDependency, weight, exclusive); - } catch (ClosedStreamCreationException ignored) { - // It is possible that either the stream for this frame or the parent stream is closed. - // In this case we should ignore the exception and allow the frame to be sent. - } - - listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endOfStream); - - // If the headers completes this stream, close it. - if (endOfStream) { - lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture()); - } - } - - @Override - public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, - boolean exclusive) throws Http2Exception { - Http2Stream stream = connection.stream(streamId); - - try { - if (stream == null) { - if (connection.streamMayHaveExisted(streamId)) { - logger.info("{} ignoring PRIORITY frame for stream {}. Stream doesn't exist but may " + - " have existed", ctx.channel(), streamId); - return; - } - - // PRIORITY frames always identify a stream. This means that if a PRIORITY frame is the - // first frame to be received for a stream that we must create the stream. - stream = connection.remote().createIdleStream(streamId); - } else if (streamCreatedAfterGoAwaySent(streamId)) { - logger.info("{} ignoring PRIORITY frame for stream {}. Stream created after GOAWAY sent. " + - "Last known stream by peer {}", - ctx.channel(), streamId, connection.remote().lastStreamKnownByPeer()); - return; - } - - // This call will create a stream for streamDependency if necessary. - // For this reason it must be done before notifying the listener. - stream.setPriority(streamDependency, weight, exclusive); - } catch (ClosedStreamCreationException ignored) { - // It is possible that either the stream for this frame or the parent stream is closed. - // In this case we should ignore the exception and allow the frame to be sent. - } - - listener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive); - } - - @Override - public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception { - Http2Stream stream = connection.stream(streamId); - if (stream == null) { - verifyStreamMayHaveExisted(streamId); - return; - } - - switch(stream.state()) { - case IDLE: - throw connectionError(PROTOCOL_ERROR, "RST_STREAM received for IDLE stream %d", streamId); - case CLOSED: - return; // RST_STREAM frames must be ignored for closed streams. - default: - break; - } - - listener.onRstStreamRead(ctx, streamId, errorCode); - - lifecycleManager.closeStream(stream, ctx.newSucceededFuture()); - } - - @Override - public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception { - // Apply oldest outstanding local settings here. This is a synchronization point between endpoints. - Http2Settings settings = encoder.pollSentSettings(); - - if (settings != null) { - applyLocalSettings(settings); - } - - listener.onSettingsAckRead(ctx); - } - - /** - * Applies settings sent from the local endpoint. - *

- * This method is only called after the local settings have been acknowledged from the remote endpoint. - */ - private void applyLocalSettings(Http2Settings settings) throws Http2Exception { - Boolean pushEnabled = settings.pushEnabled(); - final Http2FrameReader.Configuration config = frameReader.configuration(); - final Http2HeadersDecoder.Configuration headerConfig = config.headersConfiguration(); - final Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy(); - if (pushEnabled != null) { - if (connection.isServer()) { - throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified"); - } - connection.local().allowPushTo(pushEnabled); - } - - Long maxConcurrentStreams = settings.maxConcurrentStreams(); - if (maxConcurrentStreams != null) { - int value = (int) min(maxConcurrentStreams, MAX_VALUE); - connection.remote().maxStreams(value, calculateMaxStreams(value)); - } - - Long headerTableSize = settings.headerTableSize(); - if (headerTableSize != null) { - headerConfig.maxHeaderTableSize(headerTableSize); - } - - Long maxHeaderListSize = settings.maxHeaderListSize(); - if (maxHeaderListSize != null) { - headerConfig.maxHeaderListSize(maxHeaderListSize, calculateMaxHeaderListSizeGoAway(maxHeaderListSize)); - } - - Integer maxFrameSize = settings.maxFrameSize(); - if (maxFrameSize != null) { - frameSizePolicy.maxFrameSize(maxFrameSize); - } - - Integer initialWindowSize = settings.initialWindowSize(); - if (initialWindowSize != null) { - flowController().initialWindowSize(initialWindowSize); - } - } - - /** - * Calculate the {@code maxStreams} parameter for the {@link Endpoint#maxStreams(int, int)} method based upon - * {@code SETTINGS_MAX_CONCURRENT_STREAMS}. - * @param maxConcurrentStreams {@code SETTINGS_MAX_CONCURRENT_STREAMS}. - * @return the {@code maxStreams} parameter for the {@link Endpoint#maxStreams(int, int)} method. - */ - private int calculateMaxStreams(int maxConcurrentStreams) { - int maxStreams = maxConcurrentStreams + SMALLEST_MAX_CONCURRENT_STREAMS; - return maxStreams < 0 ? MAX_VALUE : maxStreams; - } - - @Override - public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception { - // Acknowledge receipt of the settings. - encoder.writeSettingsAck(ctx, ctx.newPromise()); - - encoder.remoteSettings(settings); - - listener.onSettingsRead(ctx, settings); - } - - @Override - public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { - // Send an ack back to the remote client. - // Need to retain the buffer here since it will be released after the write completes. - encoder.writePing(ctx, true, data.retainedSlice(), ctx.newPromise()); - - listener.onPingRead(ctx, data); - } - - @Override - public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { - listener.onPingAckRead(ctx, data); - } - - @Override - public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, - Http2Headers headers, int padding) throws Http2Exception { - // A client cannot push. - if (connection().isServer()) { - throw connectionError(PROTOCOL_ERROR, "A client cannot push."); - } - - Http2Stream parentStream = connection.stream(streamId); - - if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, parentStream, "PUSH_PROMISE")) { - return; - } - - if (parentStream == null) { - throw connectionError(PROTOCOL_ERROR, "Stream %d does not exist", streamId); - } - - switch (parentStream.state()) { - case OPEN: - case HALF_CLOSED_LOCAL: - // Allowed to receive push promise in these states. - break; - default: - // Connection error. - throw connectionError(PROTOCOL_ERROR, - "Stream %d in unexpected state for receiving push promise: %s", - parentStream.id(), parentStream.state()); - } - - if (!requestVerifier.isAuthoritative(ctx, headers)) { - throw streamError(promisedStreamId, PROTOCOL_ERROR, - "Promised request on stream %d for promised stream %d is not authoritative", - streamId, promisedStreamId); - } - if (!requestVerifier.isCacheable(headers)) { - throw streamError(promisedStreamId, PROTOCOL_ERROR, - "Promised request on stream %d for promised stream %d is not known to be cacheable", - streamId, promisedStreamId); - } - if (!requestVerifier.isSafe(headers)) { - throw streamError(promisedStreamId, PROTOCOL_ERROR, - "Promised request on stream %d for promised stream %d is not known to be safe", - streamId, promisedStreamId); - } - - // Reserve the push stream based with a priority based on the current stream's priority. - connection.remote().reservePushStream(promisedStreamId, parentStream); - - listener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding); - } - - @Override - public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) - throws Http2Exception { - onGoAwayRead0(ctx, lastStreamId, errorCode, debugData); - } - - @Override - public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) - throws Http2Exception { - Http2Stream stream = connection.stream(streamId); - if (stream == null || stream.state() == CLOSED || streamCreatedAfterGoAwaySent(streamId)) { - // Ignore this frame. - verifyStreamMayHaveExisted(streamId); - return; - } - - // Update the outbound flow control window. - encoder.flowController().incrementWindowSize(stream, windowSizeIncrement); - - listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement); - } - - @Override - public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, - ByteBuf payload) throws Http2Exception { - onUnknownFrame0(ctx, frameType, streamId, flags, payload); - } - - /** - * Helper method to determine if a frame that has the semantics of headers or data should be ignored for the - * {@code stream} (which may be {@code null}) associated with {@code streamId}. - */ - private boolean shouldIgnoreHeadersOrDataFrame(ChannelHandlerContext ctx, int streamId, Http2Stream stream, - String frameName) throws Http2Exception { - if (stream == null) { - if (streamCreatedAfterGoAwaySent(streamId)) { - logger.info("{} ignoring {} frame for stream {}. Stream sent after GOAWAY sent", - ctx.channel(), frameName, streamId); - return true; - } - // Its possible that this frame would result in stream ID out of order creation (PROTOCOL ERROR) and its - // also possible that this frame is received on a CLOSED stream (STREAM_CLOSED after a RST_STREAM is - // sent). We don't have enough information to know for sure, so we choose the lesser of the two errors. - throw streamError(streamId, STREAM_CLOSED, "Received %s frame for an unknown stream %d", - frameName, streamId); - } else if (stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) { - if (logger.isInfoEnabled()) { - logger.info("{} ignoring {} frame for stream {} {}", ctx.channel(), frameName, - stream.isResetSent() ? "RST_STREAM sent." : - ("Stream created after GOAWAY sent. Last known stream by peer " + - connection.remote().lastStreamKnownByPeer())); - } - return true; - } - return false; - } - - /** - * Helper method for determining whether or not to ignore inbound frames. A stream is considered to be created - * after a {@code GOAWAY} is sent if the following conditions hold: - *

- *

- *

- */ - private boolean streamCreatedAfterGoAwaySent(int streamId) { - Endpoint remote = connection.remote(); - return connection.goAwaySent() && remote.isValidStreamId(streamId) && - streamId > remote.lastStreamKnownByPeer(); - } - - private void verifyStreamMayHaveExisted(int streamId) throws Http2Exception { - if (!connection.streamMayHaveExisted(streamId)) { - throw connectionError(PROTOCOL_ERROR, "Stream %d does not exist", streamId); - } - } - } - - private final class PrefaceFrameListener implements Http2FrameListener { - /** - * Verifies that the HTTP/2 connection preface has been received from the remote endpoint. - * It is possible that the current call to - * {@link Http2FrameReader#readFrame(ChannelHandlerContext, ByteBuf, Http2FrameListener)} will have multiple - * frames to dispatch. So it may be OK for this class to get legitimate frames for the first readFrame. - */ - private void verifyPrefaceReceived() throws Http2Exception { - if (!prefaceReceived()) { - throw connectionError(PROTOCOL_ERROR, "Received non-SETTINGS as first frame."); - } - } - - @Override - public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) - throws Http2Exception { - verifyPrefaceReceived(); - return internalFrameListener.onDataRead(ctx, streamId, data, padding, endOfStream); - } - - @Override - public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, - boolean endOfStream) throws Http2Exception { - verifyPrefaceReceived(); - internalFrameListener.onHeadersRead(ctx, streamId, headers, padding, endOfStream); - } - - @Override - public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, - short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception { - verifyPrefaceReceived(); - internalFrameListener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, - exclusive, padding, endOfStream); - } - - @Override - public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, - boolean exclusive) throws Http2Exception { - verifyPrefaceReceived(); - internalFrameListener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive); - } - - @Override - public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception { - verifyPrefaceReceived(); - internalFrameListener.onRstStreamRead(ctx, streamId, errorCode); - } - - @Override - public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception { - verifyPrefaceReceived(); - internalFrameListener.onSettingsAckRead(ctx); - } - - @Override - public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception { - // The first settings should change the internalFrameListener to the "real" listener - // that expects the preface to be verified. - if (!prefaceReceived()) { - internalFrameListener = new FrameReadListener(); - } - internalFrameListener.onSettingsRead(ctx, settings); - } - - @Override - public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { - verifyPrefaceReceived(); - internalFrameListener.onPingRead(ctx, data); - } - - @Override - public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { - verifyPrefaceReceived(); - internalFrameListener.onPingAckRead(ctx, data); - } - - @Override - public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, - Http2Headers headers, int padding) throws Http2Exception { - verifyPrefaceReceived(); - internalFrameListener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding); - } - - @Override - public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) - throws Http2Exception { - onGoAwayRead0(ctx, lastStreamId, errorCode, debugData); - } - - @Override - public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) - throws Http2Exception { - verifyPrefaceReceived(); - internalFrameListener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement); - } - - @Override - public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, - ByteBuf payload) throws Http2Exception { - onUnknownFrame0(ctx, frameType, streamId, flags, payload); - } - } -}