diff --git a/NOTICE.txt b/NOTICE.txt index ee67bad4ba..e2f96722a8 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -38,4 +38,14 @@ at: * HOMEPAGE: * https://github.com/square/okhttp * LOCATION_IN_GRPC: - * okhttp/third_party/okhttp \ No newline at end of file + * okhttp/third_party/okhttp + +This product contains a modified portion of 'Netty', an open source +networking library, which can be obtained at: + + * LICENSE: + * netty/third_party/netty/LICENSE.txt (Apache License 2.0) + * HOMEPAGE: + * https://netty.io + * LOCATION_IN_GRPC: + * netty/third_party/netty diff --git a/netty/build.gradle b/netty/build.gradle index b3c6c5a3c9..a8197af47b 100644 --- a/netty/build.gradle +++ b/netty/build.gradle @@ -12,3 +12,12 @@ dependencies { } javadoc.options.links 'http://netty.io/4.1/api/' + +project.sourceSets { + main { + java { + srcDir "${projectDir}/third_party/netty/java" + } + } +} + diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 0028f0b2ad..4d339c978e 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -56,7 +56,6 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.DefaultHttp2Connection; -import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; @@ -149,7 +148,8 @@ class NettyClientHandler extends AbstractNettyHandler { connection.local().flowController( new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true)); - Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, + // TODO(ejona): swap back to DefaultHttp2Connection with Netty-4.1.9 + Http2ConnectionDecoder decoder = new FixedHttp2ConnectionDecoder(connection, encoder, frameReader); Http2Settings settings = new Http2Settings(); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index d89b1ce81a..2438d581ae 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -54,7 +54,6 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.DefaultHttp2Connection; -import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; @@ -136,7 +135,8 @@ class NettyServerHandler extends AbstractNettyHandler { Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter); - Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, + // TODO(ejona): swap back to DefaultHttp2Connection with Netty-4.1.9 + Http2ConnectionDecoder decoder = new FixedHttp2ConnectionDecoder(connection, encoder, frameReader); Http2Settings settings = new Http2Settings(); diff --git a/netty/third_party/netty/LICENSE.txt b/netty/third_party/netty/LICENSE.txt new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/netty/third_party/netty/LICENSE.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/netty/third_party/netty/java/io/grpc/netty/FixedHttp2ConnectionDecoder.java b/netty/third_party/netty/java/io/grpc/netty/FixedHttp2ConnectionDecoder.java new file mode 100644 index 0000000000..f7444805a9 --- /dev/null +++ b/netty/third_party/netty/java/io/grpc/netty/FixedHttp2ConnectionDecoder.java @@ -0,0 +1,711 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.grpc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.*; +import io.netty.handler.codec.http2.Http2Connection.Endpoint; +import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException; +import io.netty.util.internal.UnstableApi; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.util.List; + +import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; +import static io.netty.handler.codec.http2.Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS; +import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; +import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; +import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED; +import static io.netty.handler.codec.http2.Http2Exception.connectionError; +import static io.netty.handler.codec.http2.Http2Exception.streamError; +import static io.netty.handler.codec.http2.Http2PromisedRequestVerifier.ALWAYS_VERIFY; +import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED; +import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE; +import static io.netty.util.internal.ObjectUtil.checkNotNull; +import static java.lang.Integer.MAX_VALUE; +import static java.lang.Math.min; + +/** + * Provides the default implementation for processing inbound frame events and delegates to a + * {@link Http2FrameListener} + *

+ * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener} + *

+ * This interface enforces inbound flow control functionality through + * {@link Http2LocalFlowController} + */ +@UnstableApi +public class FixedHttp2ConnectionDecoder implements Http2ConnectionDecoder { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(FixedHttp2ConnectionDecoder.class); + private Http2FrameListener internalFrameListener = new PrefaceFrameListener(); + private final Http2Connection connection; + private Http2LifecycleManager lifecycleManager; + private final Http2ConnectionEncoder encoder; + private final Http2FrameReader frameReader; + private Http2FrameListener listener; + private final Http2PromisedRequestVerifier requestVerifier; + + public FixedHttp2ConnectionDecoder(Http2Connection connection, + Http2ConnectionEncoder encoder, + Http2FrameReader frameReader) { + this(connection, encoder, frameReader, ALWAYS_VERIFY); + } + + public FixedHttp2ConnectionDecoder(Http2Connection connection, + Http2ConnectionEncoder encoder, + Http2FrameReader frameReader, + Http2PromisedRequestVerifier requestVerifier) { + this.connection = checkNotNull(connection, "connection"); + this.frameReader = checkNotNull(frameReader, "frameReader"); + this.encoder = checkNotNull(encoder, "encoder"); + this.requestVerifier = checkNotNull(requestVerifier, "requestVerifier"); + if (connection.local().flowController() == null) { + connection.local().flowController(new DefaultHttp2LocalFlowController(connection)); + } + connection.local().flowController().frameWriter(encoder.frameWriter()); + } + + @Override + public void lifecycleManager(Http2LifecycleManager lifecycleManager) { + this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager"); + } + + @Override + public Http2Connection connection() { + return connection; + } + + @Override + public final Http2LocalFlowController flowController() { + return connection.local().flowController(); + } + + @Override + public void frameListener(Http2FrameListener listener) { + this.listener = checkNotNull(listener, "listener"); + } + + @Override + public Http2FrameListener frameListener() { + return listener; + } + + // Visible for testing + Http2FrameListener internalFrameListener() { + return internalFrameListener; + } + + @Override + public boolean prefaceReceived() { + return FrameReadListener.class == internalFrameListener.getClass(); + } + + @Override + public void decodeFrame(ChannelHandlerContext ctx, ByteBuf in, List out) throws Http2Exception { + frameReader.readFrame(ctx, in, internalFrameListener); + } + + @Override + public Http2Settings localSettings() { + Http2Settings settings = new Http2Settings(); + Http2FrameReader.Configuration config = frameReader.configuration(); + Http2HeadersDecoder.Configuration headersConfig = config.headersConfiguration(); + Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy(); + settings.initialWindowSize(flowController().initialWindowSize()); + settings.maxConcurrentStreams(connection.remote().maxActiveStreams()); + settings.headerTableSize(headersConfig.maxHeaderTableSize()); + settings.maxFrameSize(frameSizePolicy.maxFrameSize()); + settings.maxHeaderListSize(headersConfig.maxHeaderListSize()); + if (!connection.isServer()) { + // Only set the pushEnabled flag if this is a client endpoint. + settings.pushEnabled(connection.local().allowPushTo()); + } + return settings; + } + + @Override + public void close() { + frameReader.close(); + } + + /** + * Calculate the threshold in bytes which should trigger a {@code GO_AWAY} if a set of headers exceeds this amount. + * @param maxHeaderListSize + * SETTINGS_MAX_HEADER_LIST_SIZE for the local + * endpoint. + * @return the threshold in bytes which should trigger a {@code GO_AWAY} if a set of headers exceeds this amount. + */ + protected long calculateMaxHeaderListSizeGoAway(long maxHeaderListSize) { + return Http2CodecUtil.calculateMaxHeaderListSizeGoAway(maxHeaderListSize); + } + + private int unconsumedBytes(Http2Stream stream) { + return flowController().unconsumedBytes(stream); + } + + void onGoAwayRead0(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) + throws Http2Exception { + if (connection.goAwayReceived() && connection.local().lastStreamKnownByPeer() < lastStreamId) { + throw connectionError(PROTOCOL_ERROR, "lastStreamId MUST NOT increase. Current value: %d new value: %d", + connection.local().lastStreamKnownByPeer(), lastStreamId); + } + listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData); + connection.goAwayReceived(lastStreamId, errorCode, debugData); + } + + void onUnknownFrame0(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, + ByteBuf payload) throws Http2Exception { + listener.onUnknownFrame(ctx, frameType, streamId, flags, payload); + } + + /** + * Handles all inbound frames from the network. + */ + private final class FrameReadListener implements Http2FrameListener { + @Override + public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + boolean endOfStream) throws Http2Exception { + Http2Stream stream = connection.stream(streamId); + Http2LocalFlowController flowController = flowController(); + int bytesToReturn = data.readableBytes() + padding; + + final boolean shouldIgnore; + try { + shouldIgnore = shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, "DATA"); + } catch (Http2Exception e) { + // Ignoring this frame. We still need to count the frame towards the connection flow control + // window, but we immediately mark all bytes as consumed. + flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream); + flowController.consumeBytes(stream, bytesToReturn); + throw e; + } catch (Throwable t) { + throw connectionError(INTERNAL_ERROR, t, "Unhandled error on data stream id %d", streamId); + } + + if (shouldIgnore) { + // Ignoring this frame. We still need to count the frame towards the connection flow control + // window, but we immediately mark all bytes as consumed. + flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream); + flowController.consumeBytes(stream, bytesToReturn); + + // Verify that the stream may have existed after we apply flow control. + verifyStreamMayHaveExisted(streamId); + + // All bytes have been consumed. + return bytesToReturn; + } + + Http2Exception error = null; + switch (stream.state()) { + case OPEN: + case HALF_CLOSED_LOCAL: + break; + case HALF_CLOSED_REMOTE: + case CLOSED: + error = streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s", + stream.id(), stream.state()); + break; + default: + error = streamError(stream.id(), PROTOCOL_ERROR, + "Stream %d in unexpected state: %s", stream.id(), stream.state()); + break; + } + + int unconsumedBytes = unconsumedBytes(stream); + try { + flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream); + // Update the unconsumed bytes after flow control is applied. + unconsumedBytes = unconsumedBytes(stream); + + // If the stream is in an invalid state to receive the frame, throw the error. + if (error != null) { + throw error; + } + + // Call back the application and retrieve the number of bytes that have been + // immediately processed. + bytesToReturn = listener.onDataRead(ctx, streamId, data, padding, endOfStream); + return bytesToReturn; + } catch (Http2Exception e) { + // If an exception happened during delivery, the listener may have returned part + // of the bytes before the error occurred. If that's the case, subtract that from + // the total processed bytes so that we don't return too many bytes. + int delta = unconsumedBytes - unconsumedBytes(stream); + bytesToReturn -= delta; + throw e; + } catch (RuntimeException e) { + // If an exception happened during delivery, the listener may have returned part + // of the bytes before the error occurred. If that's the case, subtract that from + // the total processed bytes so that we don't return too many bytes. + int delta = unconsumedBytes - unconsumedBytes(stream); + bytesToReturn -= delta; + throw e; + } finally { + // If appropriate, return the processed bytes to the flow controller. + flowController.consumeBytes(stream, bytesToReturn); + + if (endOfStream) { + lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture()); + } + } + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, + boolean endOfStream) throws Http2Exception { + onHeadersRead(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endOfStream); + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, + short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception { + Http2Stream stream = connection.stream(streamId); + boolean allowHalfClosedRemote = false; + if (stream == null && !connection.streamMayHaveExisted(streamId)) { + stream = connection.remote().createStream(streamId, endOfStream); + // Allow the state to be HALF_CLOSE_REMOTE if we're creating it in that state. + allowHalfClosedRemote = stream.state() == HALF_CLOSED_REMOTE; + } + + if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, "HEADERS")) { + return; + } + + switch (stream.state()) { + case RESERVED_REMOTE: + stream.open(endOfStream); + break; + case OPEN: + case HALF_CLOSED_LOCAL: + // Allowed to receive headers in these states. + break; + case HALF_CLOSED_REMOTE: + if (!allowHalfClosedRemote) { + throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s", + stream.id(), stream.state()); + } + break; + case CLOSED: + throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s", + stream.id(), stream.state()); + default: + // Connection error. + throw connectionError(PROTOCOL_ERROR, "Stream %d in unexpected state: %s", stream.id(), + stream.state()); + } + + try { + // This call will create a stream for streamDependency if necessary. + // For this reason it must be done before notifying the listener. + stream.setPriority(streamDependency, weight, exclusive); + } catch (ClosedStreamCreationException ignored) { + // It is possible that either the stream for this frame or the parent stream is closed. + // In this case we should ignore the exception and allow the frame to be sent. + } + + listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endOfStream); + + // If the headers completes this stream, close it. + if (endOfStream) { + lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture()); + } + } + + @Override + public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, + boolean exclusive) throws Http2Exception { + Http2Stream stream = connection.stream(streamId); + + try { + if (stream == null) { + if (connection.streamMayHaveExisted(streamId)) { + logger.info("{} ignoring PRIORITY frame for stream {}. Stream doesn't exist but may " + + " have existed", ctx.channel(), streamId); + return; + } + + // PRIORITY frames always identify a stream. This means that if a PRIORITY frame is the + // first frame to be received for a stream that we must create the stream. + stream = connection.remote().createIdleStream(streamId); + } else if (streamCreatedAfterGoAwaySent(streamId)) { + logger.info("{} ignoring PRIORITY frame for stream {}. Stream created after GOAWAY sent. " + + "Last known stream by peer {}", + ctx.channel(), streamId, connection.remote().lastStreamKnownByPeer()); + return; + } + + // This call will create a stream for streamDependency if necessary. + // For this reason it must be done before notifying the listener. + stream.setPriority(streamDependency, weight, exclusive); + } catch (ClosedStreamCreationException ignored) { + // It is possible that either the stream for this frame or the parent stream is closed. + // In this case we should ignore the exception and allow the frame to be sent. + } + + listener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive); + } + + @Override + public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception { + Http2Stream stream = connection.stream(streamId); + if (stream == null) { + verifyStreamMayHaveExisted(streamId); + return; + } + + switch(stream.state()) { + case IDLE: + throw connectionError(PROTOCOL_ERROR, "RST_STREAM received for IDLE stream %d", streamId); + case CLOSED: + return; // RST_STREAM frames must be ignored for closed streams. + default: + break; + } + + listener.onRstStreamRead(ctx, streamId, errorCode); + + lifecycleManager.closeStream(stream, ctx.newSucceededFuture()); + } + + @Override + public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception { + // Apply oldest outstanding local settings here. This is a synchronization point between endpoints. + Http2Settings settings = encoder.pollSentSettings(); + + if (settings != null) { + applyLocalSettings(settings); + } + + listener.onSettingsAckRead(ctx); + } + + /** + * Applies settings sent from the local endpoint. + *

+ * This method is only called after the local settings have been acknowledged from the remote endpoint. + */ + private void applyLocalSettings(Http2Settings settings) throws Http2Exception { + Boolean pushEnabled = settings.pushEnabled(); + final Http2FrameReader.Configuration config = frameReader.configuration(); + final Http2HeadersDecoder.Configuration headerConfig = config.headersConfiguration(); + final Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy(); + if (pushEnabled != null) { + if (connection.isServer()) { + throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified"); + } + connection.local().allowPushTo(pushEnabled); + } + + Long maxConcurrentStreams = settings.maxConcurrentStreams(); + if (maxConcurrentStreams != null) { + int value = (int) min(maxConcurrentStreams, MAX_VALUE); + connection.remote().maxStreams(value, calculateMaxStreams(value)); + } + + Long headerTableSize = settings.headerTableSize(); + if (headerTableSize != null) { + headerConfig.maxHeaderTableSize(headerTableSize); + } + + Long maxHeaderListSize = settings.maxHeaderListSize(); + if (maxHeaderListSize != null) { + headerConfig.maxHeaderListSize(maxHeaderListSize, calculateMaxHeaderListSizeGoAway(maxHeaderListSize)); + } + + Integer maxFrameSize = settings.maxFrameSize(); + if (maxFrameSize != null) { + frameSizePolicy.maxFrameSize(maxFrameSize); + } + + Integer initialWindowSize = settings.initialWindowSize(); + if (initialWindowSize != null) { + flowController().initialWindowSize(initialWindowSize); + } + } + + /** + * Calculate the {@code maxStreams} parameter for the {@link Endpoint#maxStreams(int, int)} method based upon + * {@code SETTINGS_MAX_CONCURRENT_STREAMS}. + * @param maxConcurrentStreams {@code SETTINGS_MAX_CONCURRENT_STREAMS}. + * @return the {@code maxStreams} parameter for the {@link Endpoint#maxStreams(int, int)} method. + */ + private int calculateMaxStreams(int maxConcurrentStreams) { + int maxStreams = maxConcurrentStreams + SMALLEST_MAX_CONCURRENT_STREAMS; + return maxStreams < 0 ? MAX_VALUE : maxStreams; + } + + @Override + public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception { + encoder.remoteSettings(settings); + + // Acknowledge receipt of the settings. + encoder.writeSettingsAck(ctx, ctx.newPromise()); + + listener.onSettingsRead(ctx, settings); + } + + @Override + public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { + // Send an ack back to the remote client. + // Need to retain the buffer here since it will be released after the write completes. + encoder.writePing(ctx, true, data.retainedSlice(), ctx.newPromise()); + + listener.onPingRead(ctx, data); + } + + @Override + public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { + listener.onPingAckRead(ctx, data); + } + + @Override + public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, + Http2Headers headers, int padding) throws Http2Exception { + // A client cannot push. + if (connection().isServer()) { + throw connectionError(PROTOCOL_ERROR, "A client cannot push."); + } + + Http2Stream parentStream = connection.stream(streamId); + + if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, parentStream, "PUSH_PROMISE")) { + return; + } + + if (parentStream == null) { + throw connectionError(PROTOCOL_ERROR, "Stream %d does not exist", streamId); + } + + switch (parentStream.state()) { + case OPEN: + case HALF_CLOSED_LOCAL: + // Allowed to receive push promise in these states. + break; + default: + // Connection error. + throw connectionError(PROTOCOL_ERROR, + "Stream %d in unexpected state for receiving push promise: %s", + parentStream.id(), parentStream.state()); + } + + if (!requestVerifier.isAuthoritative(ctx, headers)) { + throw streamError(promisedStreamId, PROTOCOL_ERROR, + "Promised request on stream %d for promised stream %d is not authoritative", + streamId, promisedStreamId); + } + if (!requestVerifier.isCacheable(headers)) { + throw streamError(promisedStreamId, PROTOCOL_ERROR, + "Promised request on stream %d for promised stream %d is not known to be cacheable", + streamId, promisedStreamId); + } + if (!requestVerifier.isSafe(headers)) { + throw streamError(promisedStreamId, PROTOCOL_ERROR, + "Promised request on stream %d for promised stream %d is not known to be safe", + streamId, promisedStreamId); + } + + // Reserve the push stream based with a priority based on the current stream's priority. + connection.remote().reservePushStream(promisedStreamId, parentStream); + + listener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding); + } + + @Override + public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) + throws Http2Exception { + onGoAwayRead0(ctx, lastStreamId, errorCode, debugData); + } + + @Override + public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) + throws Http2Exception { + Http2Stream stream = connection.stream(streamId); + if (stream == null || stream.state() == CLOSED || streamCreatedAfterGoAwaySent(streamId)) { + // Ignore this frame. + verifyStreamMayHaveExisted(streamId); + return; + } + + // Update the outbound flow control window. + encoder.flowController().incrementWindowSize(stream, windowSizeIncrement); + + listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement); + } + + @Override + public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, + ByteBuf payload) throws Http2Exception { + onUnknownFrame0(ctx, frameType, streamId, flags, payload); + } + + /** + * Helper method to determine if a frame that has the semantics of headers or data should be ignored for the + * {@code stream} (which may be {@code null}) associated with {@code streamId}. + */ + private boolean shouldIgnoreHeadersOrDataFrame(ChannelHandlerContext ctx, int streamId, Http2Stream stream, + String frameName) throws Http2Exception { + if (stream == null) { + if (streamCreatedAfterGoAwaySent(streamId)) { + logger.info("{} ignoring {} frame for stream {}. Stream sent after GOAWAY sent", + ctx.channel(), frameName, streamId); + return true; + } + // Its possible that this frame would result in stream ID out of order creation (PROTOCOL ERROR) and its + // also possible that this frame is received on a CLOSED stream (STREAM_CLOSED after a RST_STREAM is + // sent). We don't have enough information to know for sure, so we choose the lesser of the two errors. + throw streamError(streamId, STREAM_CLOSED, "Received %s frame for an unknown stream %d", + frameName, streamId); + } else if (stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) { + if (logger.isInfoEnabled()) { + logger.info("{} ignoring {} frame for stream {} {}", ctx.channel(), frameName, + stream.isResetSent() ? "RST_STREAM sent." : + ("Stream created after GOAWAY sent. Last known stream by peer " + + connection.remote().lastStreamKnownByPeer())); + } + return true; + } + return false; + } + + /** + * Helper method for determining whether or not to ignore inbound frames. A stream is considered to be created + * after a {@code GOAWAY} is sent if the following conditions hold: + *

+ *

+ *

+ */ + private boolean streamCreatedAfterGoAwaySent(int streamId) { + Endpoint remote = connection.remote(); + return connection.goAwaySent() && remote.isValidStreamId(streamId) && + streamId > remote.lastStreamKnownByPeer(); + } + + private void verifyStreamMayHaveExisted(int streamId) throws Http2Exception { + if (!connection.streamMayHaveExisted(streamId)) { + throw connectionError(PROTOCOL_ERROR, "Stream %d does not exist", streamId); + } + } + } + + private final class PrefaceFrameListener implements Http2FrameListener { + /** + * Verifies that the HTTP/2 connection preface has been received from the remote endpoint. + * It is possible that the current call to + * {@link Http2FrameReader#readFrame(ChannelHandlerContext, ByteBuf, Http2FrameListener)} will have multiple + * frames to dispatch. So it may be OK for this class to get legitimate frames for the first readFrame. + */ + private void verifyPrefaceReceived() throws Http2Exception { + if (!prefaceReceived()) { + throw connectionError(PROTOCOL_ERROR, "Received non-SETTINGS as first frame."); + } + } + + @Override + public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) + throws Http2Exception { + verifyPrefaceReceived(); + return internalFrameListener.onDataRead(ctx, streamId, data, padding, endOfStream); + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, + boolean endOfStream) throws Http2Exception { + verifyPrefaceReceived(); + internalFrameListener.onHeadersRead(ctx, streamId, headers, padding, endOfStream); + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, + short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception { + verifyPrefaceReceived(); + internalFrameListener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, + exclusive, padding, endOfStream); + } + + @Override + public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, + boolean exclusive) throws Http2Exception { + verifyPrefaceReceived(); + internalFrameListener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive); + } + + @Override + public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception { + verifyPrefaceReceived(); + internalFrameListener.onRstStreamRead(ctx, streamId, errorCode); + } + + @Override + public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception { + verifyPrefaceReceived(); + internalFrameListener.onSettingsAckRead(ctx); + } + + @Override + public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception { + // The first settings should change the internalFrameListener to the "real" listener + // that expects the preface to be verified. + if (!prefaceReceived()) { + internalFrameListener = new FrameReadListener(); + } + internalFrameListener.onSettingsRead(ctx, settings); + } + + @Override + public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { + verifyPrefaceReceived(); + internalFrameListener.onPingRead(ctx, data); + } + + @Override + public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { + verifyPrefaceReceived(); + internalFrameListener.onPingAckRead(ctx, data); + } + + @Override + public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, + Http2Headers headers, int padding) throws Http2Exception { + verifyPrefaceReceived(); + internalFrameListener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding); + } + + @Override + public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) + throws Http2Exception { + onGoAwayRead0(ctx, lastStreamId, errorCode, debugData); + } + + @Override + public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) + throws Http2Exception { + verifyPrefaceReceived(); + internalFrameListener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement); + } + + @Override + public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, + ByteBuf payload) throws Http2Exception { + onUnknownFrame0(ctx, frameType, streamId, flags, payload); + } + } +}