Bring netty5 up to head to support HTTP2 draft 12 so we can test with GFE

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=69006100
This commit is contained in:
lryan 2014-06-11 11:50:49 -07:00 committed by Eric Anderson
parent d784765814
commit 0cf7d4c353
8 changed files with 216 additions and 171 deletions

View File

@ -60,6 +60,8 @@ public class ByteBufDeframer extends Deframer<ByteBuf> {
+ frameLength + ", readableBytes=" + frame.readableBytes());
}
if (TransportFrameUtil.isNotCompressed(compressionType)) {
// Need to retain the frame as we may be holding it over channel events
frame.retain();
return frame;
}
throw new IOException("Unknown compression type " + compressionType);

View File

@ -13,7 +13,6 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http2.draft10.frame.Http2FrameCodec;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.GenericFutureListener;
@ -28,7 +27,6 @@ public class Http2Client {
private final String host;
private final int port;
private final RequestRegistry requestRegistry;
private ChannelFuture channelFuture;
private final SSLEngine sslEngine;
public Http2Client(String host, int port, RequestRegistry requestRegistry) {
@ -56,6 +54,7 @@ public class Http2Client {
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
// TODO(user): Evaluate use of pooled allocator
b.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
final Http2Codec http2Codec = new Http2Codec(requestRegistry);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
@ -63,18 +62,16 @@ public class Http2Client {
// Assume TLS when using SSL
ch.pipeline().addLast(new SslHandler(sslEngine, false));
}
ch.pipeline().addLast(
new Http2FrameCodec(),
new Http2Codec(requestRegistry));
ch.pipeline().addLast(http2Codec);
}
});
// Start the client.
channelFuture = b.connect(host, port);
ChannelFuture channelFuture = b.connect(host, port);
// Wait for the connection
channelFuture.sync(); // (5)
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.addListener(new WorkerCleanupListener(workerGroup));
return new Http2Session(channelFuture.channel(), requestRegistry);
return new Http2Session(http2Codec.getWriter(), requestRegistry);
} catch (Throwable t) {
workerGroup.shutdownGracefully();
throw Throwables.propagate(t);

View File

@ -9,36 +9,35 @@ import com.google.net.stubby.Response;
import com.google.net.stubby.Session;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.MessageFramer;
import com.google.net.stubby.transport.Transport;
import com.google.net.stubby.transport.Transport.Code;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.draft10.Http2Error;
import io.netty.handler.codec.http2.draft10.Http2Headers;
import io.netty.handler.codec.http2.draft10.frame.DefaultHttp2RstStreamFrame;
import io.netty.handler.codec.http2.draft10.frame.Http2DataFrame;
import io.netty.handler.codec.http2.draft10.frame.Http2HeadersFrame;
import io.netty.handler.codec.http2.draft10.frame.Http2RstStreamFrame;
import io.netty.handler.codec.http2.draft10.frame.Http2StreamFrame;
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
/**
* Codec used by clients and servers to interpret HTTP2 frames in the context of an ongoing
* request-response dialog
*/
public class Http2Codec extends ChannelHandlerAdapter {
public class Http2Codec extends AbstractHttp2ConnectionHandler {
public static final int PADDING = 0;
private final boolean client;
private final RequestRegistry requestRegistry;
private final Session session;
private ByteBufAllocator alloc;
private Http2Codec.Http2Writer http2Writer;
/**
* Constructor used by servers, takes a session which will receive operation events.
*/
public Http2Codec(Session session, RequestRegistry requestRegistry) {
super(true, true);
// TODO(user): Use connection.isServer when not private in base class
this.client = false;
this.session = session;
this.requestRegistry = requestRegistry;
@ -48,54 +47,140 @@ public class Http2Codec extends ChannelHandlerAdapter {
* Constructor used by clients to send operations to a remote server
*/
public Http2Codec(RequestRegistry requestRegistry) {
super(false, true);
this.client = true;
this.session = null;
this.requestRegistry = requestRegistry;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// Abort any active requests.
requestRegistry.drainAllRequests(new Status(Transport.Code.ABORTED));
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
http2Writer = new Http2Writer(ctx);
}
super.channelInactive(ctx);
public Http2Writer getWriter() {
return http2Writer;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof Http2StreamFrame)) {
return;
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream, boolean endOfSegment, boolean compressed)
throws Http2Exception {
Request request = requestRegistry.lookup(streamId);
if (request == null) {
// Stream may have been terminated already or this is just plain spurious
throw Http2Exception.format(Http2Error.STREAM_CLOSED, "Stream does not exist");
}
this.alloc = ctx.alloc();
Http2StreamFrame frame = (Http2StreamFrame) msg;
int streamId = frame.getStreamId();
Request operation = requestRegistry.lookup(streamId);
Operation operation = client ? request.getResponse() : request;
try {
if (operation == null) {
if (client) {
// For clients an operation must already exist in the registry
throw new IllegalStateException("Response operation must already be bound");
} else {
operation = serverStart(ctx, frame);
if (operation == null) {
closeWithError(new NoOpRequest(createResponse(ctx, streamId).build()),
new Status(Code.NOT_FOUND));
}
}
} else {
// Consume the frame
progress(client ? operation.getResponse() : operation, frame);
ByteBufDeframer deframer = getOrCreateDeframer(operation, ctx);
deframer.deframe(data, operation);
if (endOfStream) {
finish(operation);
}
} catch (Throwable e) {
// TODO(user): Need to disambiguate between stream corruption as well as client/server
// generated errors. For stream corruption we always just send reset stream. For
// clients we will also generally reset-stream on error, servers may send a more detailed
// status.
Status status = Status.fromThrowable(e);
if (operation == null) {
// Create a no-op request so we can use common error handling
operation = new NoOpRequest(createResponse(ctx, streamId).build());
}
closeWithError(operation, status);
closeWithError(request, status);
}
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endStream, boolean endSegment) throws Http2Exception {
Request operation = requestRegistry.lookup(streamId);
if (operation == null) {
if (client) {
// For clients an operation must already exist in the registry
throw Http2Exception.format(Http2Error.REFUSED_STREAM, "Stream does not exist");
} else {
operation = serverStart(ctx, streamId, headers);
if (operation == null) {
closeWithError(new NoOpRequest(createResponse(new Http2Writer(ctx), streamId).build()),
new Status(Code.NOT_FOUND));
}
}
}
if (endStream) {
finish(client ? operation.getResponse() : operation);
}
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding,
boolean endStream, boolean endSegment) throws Http2Exception {
onHeadersRead(ctx, streamId, headers, padding, endStream, endSegment);
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
short weight, boolean exclusive) throws Http2Exception {
// TODO
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
Request request = requestRegistry.lookup(streamId);
if (request != null) {
closeWithError(request, new Status(Code.CANCELLED, "Stream reset"));
requestRegistry.remove(streamId);
}
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
// TOOD
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
throws Http2Exception {
// TOOD
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
// TODO
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
// TODO
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding) throws Http2Exception {
// TODO
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
ByteBuf debugData) throws Http2Exception {
// TODO
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
throws Http2Exception {
// TODO
}
@Override
public void onAltSvcRead(ChannelHandlerContext ctx, int streamId, long maxAge, int port,
ByteBuf protocolId, String host, String origin) throws Http2Exception {
// TODO
}
@Override
public void onBlockedRead(ChannelHandlerContext ctx, int streamId) throws Http2Exception {
// TODO
}
/**
* Closes the request and its associated response with an internal error.
@ -106,51 +191,37 @@ public class Http2Codec extends ChannelHandlerAdapter {
request.getResponse().close(status);
} finally {
requestRegistry.remove(request.getId());
disposeDeframer(request);
}
}
/**
* Create an HTTP2 response handler
*/
private Response.ResponseBuilder createResponse(ChannelHandlerContext ctx, int streamId) {
return Http2Response.builder(streamId, ctx.channel(), new MessageFramer(4096));
}
/**
* Writes the HTTP/2 RST Stream frame to the remote endpoint, indicating a stream failure.
*/
private void sendRstStream(ChannelHandlerContext ctx, int streamId, Http2Error error) {
DefaultHttp2RstStreamFrame frame = new DefaultHttp2RstStreamFrame.Builder()
.setStreamId(streamId).setErrorCode(error.getCode()).build();
ctx.writeAndFlush(frame);
private Response.ResponseBuilder createResponse(Http2Writer writer, int streamId) {
return Http2Response.builder(streamId, writer, new MessageFramer(4096));
}
/**
* Start the Request operation on the server
*/
private Request serverStart(ChannelHandlerContext ctx, Http2StreamFrame frame) {
if (!(frame instanceof Http2HeadersFrame)) {
// TODO(user): Better error detail to client here
return null;
}
Http2HeadersFrame headers = (Http2HeadersFrame) frame;
if (!Http2Session.PROTORPC.equals(headers.getHeaders().get("content-type"))) {
private Request serverStart(ChannelHandlerContext ctx, int streamId, Http2Headers headers) {
if (!Http2Session.PROTORPC.equals(headers.get("content-type"))) {
return null;
}
// Use Path to specify the operation
String operationName =
normalizeOperationName(headers.getHeaders().get(Http2Headers.HttpName.PATH.value()));
normalizeOperationName(headers.get(Http2Headers.HttpName.PATH.value()));
if (operationName == null) {
return null;
}
// Create the operation and bind a HTTP2 response operation
Request op = session.startRequest(operationName, createResponse(ctx, frame.getStreamId()));
Request op = session.startRequest(operationName, createResponse(new Http2Writer(ctx),
streamId));
if (op == null) {
return null;
}
requestRegistry.register(op);
// Immediately deframe the remaining headers in the frame
progressHeaders(op, (Http2HeadersFrame) frame);
return op;
}
@ -159,57 +230,6 @@ public class Http2Codec extends ChannelHandlerAdapter {
return path.substring(1);
}
/**
* Consume a received frame
*/
private void progress(Operation operation, Http2StreamFrame frame) {
if (frame instanceof Http2HeadersFrame) {
progressHeaders(operation, (Http2HeadersFrame) frame);
} else if (frame instanceof Http2DataFrame) {
progressPayload(operation, (Http2DataFrame) frame);
} else if (frame instanceof Http2RstStreamFrame) {
// Cancel
operation.close(new Status(Code.ABORTED, "HTTP2 stream reset"));
finish(operation);
} else {
// TODO(user): More refined handling for PING, GO_AWAY, SYN_STREAM, WINDOW_UPDATE, SETTINGS
operation.close(Status.OK);
finish(operation);
}
}
/**
* Consume headers in the frame. Any header starting with ':' is considered reserved
*/
private void progressHeaders(Operation operation, Http2HeadersFrame frame) {
// TODO(user): Currently we do not do anything with HTTP2 headers
if (frame.isEndOfStream()) {
finish(operation);
}
}
private void progressPayload(Operation operation, Http2DataFrame frame) {
try {
// Copy the data buffer.
// TODO(user): Need to decide whether to use pooling or not.
ByteBuf dataCopy = frame.content().copy();
if (operation == null) {
return;
}
ByteBufDeframer deframer = getOrCreateDeframer(operation);
deframer.deframe(dataCopy, operation);
if (frame.isEndOfStream()) {
finish(operation);
}
} finally {
frame.release();
}
}
/**
* Called when a HTTP2 stream is closed.
*/
@ -221,10 +241,10 @@ public class Http2Codec extends ChannelHandlerAdapter {
}
}
public ByteBufDeframer getOrCreateDeframer(Operation operation) {
public ByteBufDeframer getOrCreateDeframer(Operation operation, ChannelHandlerContext ctx) {
ByteBufDeframer deframer = operation.get(ByteBufDeframer.class);
if (deframer == null) {
deframer = new ByteBufDeframer(alloc);
deframer = new ByteBufDeframer(ctx.alloc());
operation.put(ByteBufDeframer.class, deframer);
}
return deframer;
@ -236,4 +256,39 @@ public class Http2Codec extends ChannelHandlerAdapter {
deframer.dispose();
}
}
public class Http2Writer {
private final ChannelHandlerContext ctx;
public Http2Writer(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
public ChannelFuture writeData(int streamId, ByteBuf data, boolean endStream,
boolean endSegment, boolean compressed) {
return Http2Codec.this.writeData(ctx, ctx.newPromise(),
streamId, data, PADDING, endStream, endSegment, compressed);
}
public ChannelFuture writeHeaders(int streamId,
Http2Headers headers,
boolean endStream, boolean endSegment) {
return Http2Codec.this.writeHeaders(ctx, ctx.newPromise(), streamId,
headers, PADDING, endStream, endSegment);
}
public ChannelFuture writeHeaders(int streamId, Http2Headers headers, int streamDependency,
short weight, boolean exclusive,
boolean endStream, boolean endSegment) {
return Http2Codec.this.writeHeaders(ctx, ctx.newPromise(), streamId,
headers, streamDependency, weight, exclusive, PADDING, endStream, endSegment);
}
public ChannelFuture writeRstStream(int streamId, long errorCode) {
return Http2Codec.this.writeRstStream(ctx, ctx.newPromise(),
streamId,
errorCode);
}
}
}

View File

@ -6,25 +6,23 @@ import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Framer;
import com.google.net.stubby.transport.Transport;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.http2.draft10.frame.DefaultHttp2DataFrame;
import java.io.InputStream;
import java.nio.ByteBuffer;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
/**
* Base implementation of {@link Operation} that writes HTTP2 frames
*/
abstract class Http2Operation extends AbstractOperation implements Framer.Sink {
protected final Framer framer;
private final Channel channel;
private final Framer framer;
private final Http2Codec.Http2Writer writer;
Http2Operation(int streamId, Channel channel, Framer framer) {
Http2Operation(int streamId, Http2Codec.Http2Writer writer, Framer framer) {
super(streamId);
this.channel = channel;
this.writer = writer;
this.framer = framer;
}
@ -55,10 +53,10 @@ abstract class Http2Operation extends AbstractOperation implements Framer.Sink {
@Override
public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
boolean closed = getPhase() == Phase.CLOSED;
DefaultHttp2DataFrame dataFrame = new DefaultHttp2DataFrame.Builder().setStreamId(getId())
.setContent(Unpooled.wrappedBuffer(frame)).setEndOfStream(closed).build();
try {
ChannelFuture channelFuture = channel.writeAndFlush(dataFrame);
ChannelFuture channelFuture = writer.writeData(getId(),
Unpooled.wrappedBuffer(frame), closed, closed, false);
if (!closed) {
// Sync for all except the last frame to prevent buffer corruption.
channelFuture.get();

View File

@ -4,14 +4,12 @@ import com.google.net.stubby.Request;
import com.google.net.stubby.Response;
import com.google.net.stubby.transport.Framer;
import io.netty.channel.Channel;
import io.netty.handler.codec.http2.draft10.DefaultHttp2Headers;
import io.netty.handler.codec.http2.draft10.Http2Headers;
import io.netty.handler.codec.http2.draft10.frame.DefaultHttp2HeadersFrame;
import java.net.InetAddress;
import java.net.UnknownHostException;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
/**
* A HTTP2 based implementation of {@link Request}
*/
@ -29,22 +27,19 @@ class Http2Request extends Http2Operation implements Request {
HOST_NAME = hostName;
}
private static DefaultHttp2HeadersFrame createHeadersFrame(int id, String operationName) {
Http2Headers headers = DefaultHttp2Headers.newBuilder()
.setMethod("POST")
.setPath("/" + operationName)
.setAuthority(HOST_NAME)
.setScheme("https")
.add("content-type", Http2Session.PROTORPC)
.build();
return new DefaultHttp2HeadersFrame.Builder().setStreamId(id).setHeaders(headers).build();
}
private final Response response;
public Http2Request(Response response, Channel channel, String operationName, Framer framer) {
super(response.getId(), channel, framer);
channel.write(createHeadersFrame(response.getId(), operationName));
public Http2Request(Response response, String operationName,
Http2Codec.Http2Writer writer, Framer framer) {
super(response.getId(), writer, framer);
Http2Headers headers = DefaultHttp2Headers.newBuilder()
.method("POST")
.path("/" + operationName)
.authority(HOST_NAME)
.scheme("https")
.add("content-type", Http2Session.PROTORPC)
.build();
writer.writeHeaders(response.getId(), headers, false, true);
this.response = response;
}

View File

@ -3,14 +3,13 @@ package com.google.net.stubby.http2.netty;
import com.google.net.stubby.Response;
import com.google.net.stubby.transport.Framer;
import io.netty.channel.Channel;
/**
* A HTTP2 based implementation of a {@link Response}.
*/
class Http2Response extends Http2Operation implements Response {
public static ResponseBuilder builder(final int id, final Channel channel, final Framer framer) {
public static ResponseBuilder builder(final int id, final Http2Codec.Http2Writer writer,
final Framer framer) {
return new ResponseBuilder() {
@Override
public Response build(int id) {
@ -19,12 +18,12 @@ class Http2Response extends Http2Operation implements Response {
@Override
public Response build() {
return new Http2Response(id, channel, framer);
return new Http2Response(id, writer, framer);
}
};
}
private Http2Response(int id, Channel channel, Framer framer) {
super(id, channel, framer);
private Http2Response(int id, Http2Codec.Http2Writer writer, Framer framer) {
super(id, writer, framer);
}
}

View File

@ -13,7 +13,6 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http2.draft10.frame.Http2FrameCodec;
/**
* Simple server connection startup that attaches a {@link Session} implementation to a connection.
@ -42,7 +41,7 @@ public class Http2Server implements Runnable {
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new Http2FrameCodec(), new Http2Codec(session, operations));
ch.pipeline().addLast(new Http2Codec(session, operations));
}
}).option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

View File

@ -6,10 +6,10 @@ import com.google.net.stubby.Response;
import com.google.net.stubby.Session;
import com.google.net.stubby.transport.MessageFramer;
import io.netty.channel.Channel;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.ChannelHandlerContext;
/**
* An implementation of {@link Session} that can be used by clients to start
* a {@link Request}
@ -18,12 +18,12 @@ public class Http2Session implements Session {
public static final String PROTORPC = "application/protorpc";
private final Channel channel;
private final Http2Codec.Http2Writer writer;
private final RequestRegistry requestRegistry;
private AtomicInteger streamId;
private final AtomicInteger streamId;
public Http2Session(Channel channel, RequestRegistry requestRegistry) {
this.channel = channel;
public Http2Session(Http2Codec.Http2Writer writer, RequestRegistry requestRegistry) {
this.writer = writer;
this.requestRegistry = requestRegistry;
// Clients are odd numbers starting at 3. A value of 1 is reserved for the upgrade protocol.
streamId = new AtomicInteger(3);
@ -36,8 +36,8 @@ public class Http2Session implements Session {
@Override
public Request startRequest(String operationName, Response.ResponseBuilder response) {
int nextSessionId = getNextStreamId();
Request operation = new Http2Request(response.build(nextSessionId), channel, operationName,
new MessageFramer(4096));
Request operation = new Http2Request(response.build(nextSessionId), operationName,
writer, new MessageFramer(4096));
requestRegistry.register(operation);
return operation;
}