Remove SPDY support from GRPC

Will re-create the E2E tests through GFE when we can properly initiate HTTP2 with GFE.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=68488736
This commit is contained in:
lryan 2014-06-03 14:22:33 -07:00 committed by Eric Anderson
parent ef31a5f2ae
commit f8bbc12468
20 changed files with 79 additions and 719 deletions

View File

@ -0,0 +1,11 @@
package com.google.net.stubby;
/**
* A request that does no work.
*/
public class NoOpRequest extends AbstractRequest {
public NoOpRequest(Response response) {
super(response);
}
}

View File

@ -15,9 +15,6 @@ import java.nio.ByteOrder;
/**
* Parse a sequence of {@link ByteBuf} instances that represent the frames of a GRPC call
*
* TODO(user): This is essentially a duplicate of the spdy deframer. Should find a way to
* share common code.
*/
public class ByteBufDeframer extends Deframer<ByteBuf> {

View File

@ -1,9 +1,11 @@
package com.google.net.stubby.http2.netty;
import com.google.net.stubby.NoOpRequest;
import com.google.net.stubby.Operation;
import com.google.net.stubby.Operation.Phase;
import com.google.net.stubby.Request;
import com.google.net.stubby.RequestRegistry;
import com.google.net.stubby.Response;
import com.google.net.stubby.Session;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.MessageFramer;
@ -66,7 +68,8 @@ public class Http2Codec extends ChannelHandlerAdapter {
}
this.alloc = ctx.alloc();
Http2StreamFrame frame = (Http2StreamFrame) msg;
Request operation = requestRegistry.lookup(frame.getStreamId());
int streamId = frame.getStreamId();
Request operation = requestRegistry.lookup(streamId);
try {
if (operation == null) {
if (client) {
@ -75,8 +78,8 @@ public class Http2Codec extends ChannelHandlerAdapter {
} else {
operation = serverStart(ctx, frame);
if (operation == null) {
// Unknown operation, refuse the stream
sendRstStream(ctx, frame.getStreamId(), Http2Error.REFUSED_STREAM);
closeWithError(new NoOpRequest(createResponse(ctx, streamId).build()),
new Status(Code.NOT_FOUND));
}
}
} else {
@ -84,22 +87,33 @@ public class Http2Codec extends ChannelHandlerAdapter {
progress(client ? operation.getResponse() : operation, frame);
}
} catch (Throwable e) {
closeWithInternalError(operation, e);
sendRstStream(ctx, frame.getStreamId(), Http2Error.INTERNAL_ERROR);
throw e;
Status status = Status.fromThrowable(e);
if (operation == null) {
// Create a no-op request so we can use common error handling
operation = new NoOpRequest(createResponse(ctx, streamId).build());
}
closeWithError(operation, status);
}
}
/**
* Closes the request and its associated response with an internal error.
*/
private void closeWithError(Request request, Status status) {
try {
request.close(status);
request.getResponse().close(status);
} finally {
requestRegistry.remove(request.getId());
}
}
/**
* Closes the request and its associate response with an internal error.
* Create an HTTP2 response handler
*/
private void closeWithInternalError(Request request, Throwable e) {
if (request != null) {
Status status = new Status(Code.INTERNAL, e);
request.close(status);
request.getResponse().close(status);
requestRegistry.remove(request.getId());
}
private Response.ResponseBuilder createResponse(ChannelHandlerContext ctx, int streamId) {
return Http2Response.builder(streamId, ctx.channel(), new MessageFramer(4096));
}
/**
@ -130,8 +144,7 @@ public class Http2Codec extends ChannelHandlerAdapter {
return null;
}
// Create the operation and bind a HTTP2 response operation
Request op = session.startRequest(operationName,
Http2Response.builder(frame.getStreamId(), ctx.channel(), new MessageFramer(4096)));
Request op = session.startRequest(operationName, createResponse(ctx, frame.getStreamId()));
if (op == null) {
return null;
}
@ -157,11 +170,11 @@ public class Http2Codec extends ChannelHandlerAdapter {
progressPayload(operation, (Http2DataFrame) frame);
} else if (frame instanceof Http2RstStreamFrame) {
// Cancel
operation.close(null);
operation.close(new Status(Code.ABORTED, "HTTP2 stream reset"));
finish(operation);
} else {
// TODO(user): More refined handling for PING, GO_AWAY, SYN_STREAM, WINDOW_UPDATE, SETTINGS
operation.close(null);
operation.close(Status.OK);
finish(operation);
}
}

View File

@ -13,7 +13,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* A SPDY based implementation of {@link Request}
* A HTTP2 based implementation of {@link Request}
*/
class Http2Request extends Http2Operation implements Request {

View File

@ -6,7 +6,7 @@ import com.google.net.stubby.transport.Framer;
import io.netty.channel.Channel;
/**
* A SPDY based implementation of a {@link Response}.
* A HTTP2 based implementation of a {@link Response}.
*/
class Http2Response extends Http2Operation implements Response {

View File

@ -14,7 +14,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* An implementation of {@link Session} that can be used by clients to start
* a {@link Request}
*/
class Http2Session implements Session {
public class Http2Session implements Session {
public static final String PROTORPC = "application/protorpc";

View File

@ -1,4 +1,4 @@
package com.google.net.stubby.spdy.okhttp;
package com.google.net.stubby.http2.okhttp;
import com.google.common.collect.Lists;

View File

@ -1,4 +1,4 @@
package com.google.net.stubby.spdy.okhttp;
package com.google.net.stubby.http2.okhttp;
import com.google.common.io.ByteBuffers;
import com.google.net.stubby.AbstractOperation;
@ -16,14 +16,14 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
/**
* Base implementation of {@link Operation} that writes SPDY frames
* Base implementation of {@link Operation} that writes HTTP2 frames
*/
abstract class SpdyOperation extends AbstractOperation implements Framer.Sink {
abstract class Http2Operation extends AbstractOperation implements Framer.Sink {
protected final Framer framer;
private final FrameWriter frameWriter;
SpdyOperation(int id, FrameWriter frameWriter, Framer framer) {
Http2Operation(int id, FrameWriter frameWriter, Framer framer) {
super(id);
this.frameWriter = frameWriter;
this.framer = framer;

View File

@ -1,4 +1,4 @@
package com.google.net.stubby.spdy.okhttp;
package com.google.net.stubby.http2.okhttp;
import com.google.net.stubby.Request;
import com.google.net.stubby.RequestRegistry;
@ -12,12 +12,12 @@ import com.squareup.okhttp.internal.spdy.FrameWriter;
import java.io.IOException;
/**
* A SPDY based implementation of {@link Request}
* A HTTP2 based implementation of {@link Request}
*/
public class SpdyRequest extends SpdyOperation implements Request {
public class Http2Request extends Http2Operation implements Request {
private final Response response;
public SpdyRequest(FrameWriter frameWriter, String operationName,
public Http2Request(FrameWriter frameWriter, String operationName,
Response response, RequestRegistry requestRegistry,
Framer framer) {
super(response.getId(), frameWriter, framer);

View File

@ -1,4 +1,4 @@
package com.google.net.stubby.spdy.okhttp;
package com.google.net.stubby.http2.okhttp;
import com.google.net.stubby.Response;
import com.google.net.stubby.Status;
@ -10,9 +10,9 @@ import com.squareup.okhttp.internal.spdy.FrameWriter;
import java.io.IOException;
/**
* A SPDY based implementation of a {@link Response}.
* A HTTP2 based implementation of a {@link Response}.
*/
public class SpdyResponse extends SpdyOperation implements Response {
public class Http2Response extends Http2Operation implements Response {
public static ResponseBuilder builder(final int id, final FrameWriter framewriter,
final Framer framer) {
@ -24,12 +24,12 @@ public class SpdyResponse extends SpdyOperation implements Response {
@Override
public Response build() {
return new SpdyResponse(id, framewriter, framer);
return new Http2Response(id, framewriter, framer);
}
};
}
private SpdyResponse(int id, FrameWriter frameWriter, Framer framer) {
private Http2Response(int id, FrameWriter frameWriter, Framer framer) {
super(id, frameWriter, framer);
try {
frameWriter.synStream(false, false, getId(), 0, 0, 0, Headers.createResponseHeaders());

View File

@ -1,4 +1,4 @@
package com.google.net.stubby.spdy.okhttp;
package com.google.net.stubby.http2.okhttp;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
@ -16,7 +16,6 @@ import com.google.net.stubby.transport.MessageFramer;
import com.google.net.stubby.transport.Transport;
import com.google.net.stubby.transport.Transport.Code;
import com.squareup.okhttp.Protocol;
import com.squareup.okhttp.internal.spdy.ErrorCode;
import com.squareup.okhttp.internal.spdy.FrameReader;
import com.squareup.okhttp.internal.spdy.FrameWriter;
@ -24,7 +23,6 @@ import com.squareup.okhttp.internal.spdy.Header;
import com.squareup.okhttp.internal.spdy.HeadersMode;
import com.squareup.okhttp.internal.spdy.Http20Draft10;
import com.squareup.okhttp.internal.spdy.Settings;
import com.squareup.okhttp.internal.spdy.Spdy3;
import com.squareup.okhttp.internal.spdy.Variant;
import okio.BufferedSink;
@ -64,19 +62,19 @@ public class OkHttpSession implements Session {
new Status(Transport.Code.PERMISSION_DENIED, "Invalid credentials"))
.build();
public static Session startClient(Protocol protocol, Socket socket,
RequestRegistry requestRegistry, Executor executor) {
public static Session startClient(Socket socket, RequestRegistry requestRegistry,
Executor executor) {
try {
return new OkHttpSession(protocol, socket, requestRegistry, executor);
return new OkHttpSession(socket, requestRegistry, executor);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
public static Session startServer(Protocol protocol, Socket socket, Session server,
RequestRegistry requestRegistry, Executor executor) {
public static Session startServer(Socket socket, Session server, RequestRegistry requestRegistry,
Executor executor) {
try {
return new OkHttpSession(protocol, socket, server, requestRegistry, executor);
return new OkHttpSession(socket, server, requestRegistry, executor);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
@ -93,9 +91,9 @@ public class OkHttpSession implements Session {
/**
* Construct a client-side session
*/
private OkHttpSession(Protocol protocol, Socket socket, RequestRegistry requestRegistry,
private OkHttpSession(Socket socket, RequestRegistry requestRegistry,
Executor executor) throws IOException {
Variant variant = getProtocolVariant(protocol);
Variant variant = new Http20Draft10();
// TODO(user): use Okio.buffer(Socket)
countingInputStream = new CountingInputStream(socket.getInputStream());
countingOutputStream = new CountingOutputStream(socket.getOutputStream());
@ -114,9 +112,9 @@ public class OkHttpSession implements Session {
/**
* Construct a server-side session
*/
private OkHttpSession(Protocol protocol, Socket socket, Session server,
private OkHttpSession(Socket socket, Session server,
RequestRegistry requestRegistry, Executor executor) throws IOException {
Variant variant = getProtocolVariant(protocol);
Variant variant = new Http20Draft10();
// TODO(user): use Okio.buffer(Socket)
countingInputStream = new CountingInputStream(socket.getInputStream());
countingOutputStream = new CountingOutputStream(socket.getOutputStream());
@ -137,17 +135,6 @@ public class OkHttpSession implements Session {
return "in=" + countingInputStream.getCount() + ";out=" + countingOutputStream.getCount();
}
private Variant getProtocolVariant(Protocol protocol) {
switch (protocol) {
case HTTP_2:
return new Http20Draft10();
case SPDY_3:
return new Spdy3();
default:
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
}
}
private int getNextStreamId() {
// Client initiated streams are odd, server initiated ones are even
// We start clients at 3 to avoid conflicting with HTTP negotiation
@ -162,9 +149,9 @@ public class OkHttpSession implements Session {
public Request startRequest(String operationName, Response.ResponseBuilder responseBuilder) {
int nextStreamId = getNextStreamId();
Response response = responseBuilder.build(nextStreamId);
SpdyRequest spdyRequest = new SpdyRequest(frameWriter, operationName, response, requestRegistry,
new MessageFramer(4096));
return spdyRequest;
Http2Request request = new Http2Request(frameWriter, operationName, response,
requestRegistry, new MessageFramer(4096));
return request;
}
/**
@ -219,7 +206,7 @@ public class OkHttpSession implements Session {
/**
* Handle a SPDY DATA frame
* Handle a HTTP2 DATA frame
*/
@Override
public void data(boolean inFinished, int streamId, BufferedSource in, int length)
@ -246,7 +233,7 @@ public class OkHttpSession implements Session {
}
/**
* Called when a SPDY stream is closed.
* Called when a HTTP2 stream is closed.
*/
private void finish(int streamId) {
Request request = requestRegistry.remove(streamId);
@ -256,7 +243,7 @@ public class OkHttpSession implements Session {
}
/**
* Handle a SPDY HEADER or SYN_STREAM frame
* Handle HTTP2 HEADER & CONTINUATION frames
*/
@Override
public void headers(boolean arg0,
@ -269,12 +256,11 @@ public class OkHttpSession implements Session {
Operation op = getOperation(streamId);
// Start an Operation for SYN_STREAM
if (op == null && (headersMode == HeadersMode.SPDY_SYN_STREAM
|| headersMode == HeadersMode.HTTP_20_HEADERS)) {
if (op == null && headersMode == HeadersMode.HTTP_20_HEADERS) {
for (Header header : headers) {
if (header.name.equals(Header.TARGET_PATH)) {
Request request = serverSession.startRequest(header.value.utf8(),
SpdyResponse.builder(streamId, frameWriter, new MessageFramer(4096)));
Http2Response.builder(streamId, frameWriter, new MessageFramer(4096)));
requestRegistry.register(request);
op = request;
break;

View File

@ -1,53 +0,0 @@
package com.google.net.stubby.spdy.netty;
import com.google.net.stubby.transport.Deframer;
import com.google.net.stubby.transport.TransportFrameUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteOrder;
/**
* Parse a sequence of {@link ByteBuf} instances that represent the frames of a GRPC call
*/
public class ByteBufDeframer extends Deframer<ByteBuf> {
private final CompositeByteBuf buffer;
public ByteBufDeframer() {
buffer = Unpooled.compositeBuffer();
}
@Override
protected DataInputStream prefix(ByteBuf frame) throws IOException {
buffer.addComponent(frame);
buffer.writerIndex(buffer.writerIndex() + frame.writerIndex() - frame.readerIndex());
return new DataInputStream(new ByteBufInputStream(buffer));
}
@Override
protected int consolidate() {
buffer.consolidate();
return buffer.readableBytes();
}
@Override
protected ByteBuf decompress(ByteBuf frame) throws IOException {
frame = frame.order(ByteOrder.BIG_ENDIAN);
int compressionType = frame.readUnsignedByte();
int frameLength = frame.readUnsignedMedium();
if (frameLength != frame.readableBytes()) {
throw new IllegalArgumentException("GRPC and buffer lengths misaligned. Frame length="
+ frameLength + ", readableBytes=" + frame.readableBytes());
}
if (TransportFrameUtil.isNotCompressed(compressionType)) {
return frame;
}
throw new IOException("Unknown compression type " + compressionType);
}
}

View File

@ -1,100 +0,0 @@
package com.google.net.stubby.spdy.netty;
import com.google.common.base.Throwables;
import com.google.net.stubby.RequestRegistry;
import com.google.net.stubby.Session;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.spdy.SpdyFrameCodec;
import io.netty.handler.codec.spdy.SpdyVersion;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.GenericFutureListener;
import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;
/**
* Simple client connection startup that creates a {@link SpdySession} for use
* with protocol bindings.
*/
public class SpdyClient {
private final String host;
private final int port;
private final RequestRegistry requestRegistry;
private ChannelFuture channelFuture;
private final SSLEngine sslEngine;
public SpdyClient(String host, int port, RequestRegistry requestRegistry) {
this(host, port, requestRegistry, null);
}
public SpdyClient(String host, int port, RequestRegistry requestRegistry,
@Nullable SSLEngine sslEngine) {
this.host = host;
this.port = port;
this.requestRegistry = requestRegistry;
this.sslEngine = sslEngine;
// TODO(user): NPN support
if (sslEngine != null) {
sslEngine.setUseClientMode(true);
}
}
public Session startAndWait() {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
// TODO(user): Evaluate use of pooled allocator
b.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
if (sslEngine != null) {
// Assume TLS when using SSL
ch.pipeline().addLast(new SslHandler(sslEngine, false));
}
ch.pipeline().addLast(
new SpdyFrameCodec(SpdyVersion.SPDY_3_1),
new SpdyCodec(requestRegistry));
}
});
// Start the client.
channelFuture = b.connect(host, port);
// Wait for the connection
channelFuture.sync(); // (5)
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.addListener(new WorkerCleanupListener(workerGroup));
return new SpdySession(channelFuture.channel(), requestRegistry);
} catch (Throwable t) {
workerGroup.shutdownGracefully();
throw Throwables.propagate(t);
}
}
private static class WorkerCleanupListener
implements GenericFutureListener<io.netty.util.concurrent.Future<Void>> {
private final EventLoopGroup workerGroup;
public WorkerCleanupListener(EventLoopGroup workerGroup) {
this.workerGroup = workerGroup;
}
@Override
public void operationComplete(io.netty.util.concurrent.Future<Void> future) throws Exception {
workerGroup.shutdownGracefully();
}
}
}

View File

@ -1,199 +0,0 @@
package com.google.net.stubby.spdy.netty;
import com.google.net.stubby.Operation;
import com.google.net.stubby.Operation.Phase;
import com.google.net.stubby.Request;
import com.google.net.stubby.RequestRegistry;
import com.google.net.stubby.Session;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.MessageFramer;
import com.google.net.stubby.transport.Transport;
import com.google.net.stubby.transport.Transport.Code;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.spdy.DefaultSpdyRstStreamFrame;
import io.netty.handler.codec.spdy.SpdyDataFrame;
import io.netty.handler.codec.spdy.SpdyHeaders;
import io.netty.handler.codec.spdy.SpdyHeadersFrame;
import io.netty.handler.codec.spdy.SpdyRstStreamFrame;
import io.netty.handler.codec.spdy.SpdyStreamFrame;
import io.netty.handler.codec.spdy.SpdyStreamStatus;
import io.netty.handler.codec.spdy.SpdySynStreamFrame;
/**
* Codec used by clients and servers to interpret SPDY frames in the context of an ongoing
* request-response dialog
*/
public class SpdyCodec extends ChannelHandlerAdapter {
private final boolean client;
private final RequestRegistry requestRegistry;
private final Session session;
/**
* Constructor used by servers, takes a session which will receive operation events.
*/
public SpdyCodec(Session session, RequestRegistry requestRegistry) {
this.client = false;
this.session = session;
this.requestRegistry = requestRegistry;
}
/**
* Constructor used by clients to send operations to a remote server
*/
public SpdyCodec(RequestRegistry requestRegistry) {
this.client = true;
this.session = null;
this.requestRegistry = requestRegistry;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// Abort any active requests.
requestRegistry.drainAllRequests(new Status(Transport.Code.ABORTED));
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof SpdyStreamFrame)) {
return;
}
SpdyStreamFrame frame = (SpdyStreamFrame) msg;
Request operation = requestRegistry.lookup(frame.getStreamId());
try {
if (operation == null) {
if (client) {
// For clients an operation must already exist in the registry
throw new IllegalStateException("Response operation must already be bound");
} else {
operation = serverStart(ctx, frame);
if (operation == null) {
// Unknown operation, refuse the stream
sendRstStream(ctx, frame.getStreamId(), SpdyStreamStatus.REFUSED_STREAM);
}
}
} else {
// Consume the frame
progress(client ? operation.getResponse() : operation, frame);
}
} catch (Throwable e) {
closeWithInternalError(operation, e);
sendRstStream(ctx, frame.getStreamId(), SpdyStreamStatus.INTERNAL_ERROR);
throw e;
}
}
/**
* Closes the request and its associate response with an internal error.
*/
private void closeWithInternalError(Request request, Throwable e) {
if (request != null) {
Status status = new Status(Code.INTERNAL, e);
request.close(status);
request.getResponse().close(status);
requestRegistry.remove(request.getId());
}
}
/**
* Writes the Spdy RST Stream frame to the remote endpoint, indicating a stream failure.
*/
private void sendRstStream(ChannelHandlerContext ctx, int streamId, SpdyStreamStatus status) {
DefaultSpdyRstStreamFrame frame = new DefaultSpdyRstStreamFrame(streamId, status.getCode());
ctx.writeAndFlush(frame);
}
/**
* Start the Request operation on the server
*/
private Request serverStart(ChannelHandlerContext ctx, SpdyStreamFrame frame) {
if (!(frame instanceof SpdySynStreamFrame)) {
// TODO(user): Better error detail to client here
return null;
}
SpdySynStreamFrame headers = (SpdySynStreamFrame) frame;
if (!SpdySession.PROTORPC.equals(headers.headers().get("content-type"))) {
return null;
}
// Use Path to specify the operation
String operationName =
normalizeOperationName(headers.headers().get(SpdyHeaders.HttpNames.PATH));
if (operationName == null) {
return null;
}
// Create the operation and bind a SPDY response operation
Request op = session.startRequest(operationName,
SpdyResponse.builder(frame.getStreamId(), ctx.channel(), new MessageFramer(4096)));
if (op == null) {
return null;
}
requestRegistry.register(op);
// Immediately deframe the remaining headers in the frame
progressHeaders(op, (SpdyHeadersFrame) frame);
return op;
}
// TODO(user): This needs proper namespacing support, this is currently just a hack
private static String normalizeOperationName(String path) {
return path.substring(1);
}
/**
* Consume a received frame
*/
private void progress(Operation operation, SpdyStreamFrame frame) {
if (frame instanceof SpdyHeadersFrame) {
progressHeaders(operation, (SpdyHeadersFrame) frame);
} else if (frame instanceof SpdyDataFrame) {
progressPayload(operation, (SpdyDataFrame) frame);
} else if (frame instanceof SpdyRstStreamFrame) {
// Cancel
operation.close(null);
finish(operation);
} else {
// TODO(user): More refined handling for PING, GO_AWAY, SYN_STREAM, WINDOW_UPDATE, SETTINGS
operation.close(null);
finish(operation);
}
}
/**
* Consume headers in the frame. Any header starting with ';' is considered reserved
*/
private void progressHeaders(Operation operation, SpdyHeadersFrame frame) {
// TODO(user): Currently we do not do anything with SPDY headers
if (frame.isLast()) {
finish(operation);
}
}
private void progressPayload(Operation operation, SpdyDataFrame frame) {
if (operation == null) {
return;
}
ByteBufDeframer deframer = operation.get(ByteBufDeframer.class);
if (deframer == null) {
deframer = new ByteBufDeframer();
operation.put(ByteBufDeframer.class, deframer);
}
deframer.deframe(frame.content(), operation);
if (frame.isLast()) {
finish(operation);
}
}
/**
* Called when a SPDY stream is closed.
*/
private void finish(Operation operation) {
requestRegistry.remove(operation.getId());
if (operation.getPhase() != Phase.CLOSED) {
operation.close(Status.OK);
}
}
}

View File

@ -1,78 +0,0 @@
package com.google.net.stubby.spdy.netty;
import com.google.net.stubby.AbstractOperation;
import com.google.net.stubby.Operation;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Framer;
import com.google.net.stubby.transport.Transport;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.spdy.DefaultSpdyDataFrame;
import io.netty.handler.codec.spdy.SpdyHeadersFrame;
import java.io.InputStream;
import java.nio.ByteBuffer;
/**
* Base implementation of {@link Operation} that writes SPDY frames
*/
abstract class SpdyOperation extends AbstractOperation implements Framer.Sink {
protected final Framer framer;
private final Channel channel;
SpdyOperation(SpdyHeadersFrame headersFrame, Channel channel, Framer framer) {
super(headersFrame.getStreamId());
this.channel = channel;
this.framer = framer;
channel.write(headersFrame);
}
@Override
public Operation addContext(String type, InputStream message, Phase nextPhase) {
super.addContext(type, message, nextPhase);
framer.writeContext(type, message, getPhase() == Phase.CLOSED, this);
return this;
}
@Override
public Operation addPayload(InputStream payload, Phase nextPhase) {
super.addPayload(payload, nextPhase);
framer.writePayload(payload, getPhase() == Phase.CLOSED, this);
return this;
}
@Override
public Operation close(Status status) {
boolean alreadyClosed = getPhase() == Phase.CLOSED;
super.close(status);
if (!alreadyClosed) {
framer.writeStatus(status, true, this);
}
return this;
}
@Override
public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
boolean closed = getPhase() == Phase.CLOSED;
DefaultSpdyDataFrame dataFrame = new DefaultSpdyDataFrame(getId(),
Unpooled.wrappedBuffer(frame));
boolean streamClosed = closed && endOfMessage;
dataFrame.setLast(streamClosed);
try {
ChannelFuture channelFuture = channel.writeAndFlush(dataFrame);
if (!streamClosed) {
// Sync for all except the last frame to prevent buffer corruption.
channelFuture.get();
}
} catch (Exception e) {
close(new Status(Transport.Code.INTERNAL, e));
} finally {
if (streamClosed) {
framer.close();
}
}
}
}

View File

@ -1,55 +0,0 @@
package com.google.net.stubby.spdy.netty;
import com.google.net.stubby.Request;
import com.google.net.stubby.Response;
import com.google.net.stubby.transport.Framer;
import io.netty.channel.Channel;
import io.netty.handler.codec.spdy.DefaultSpdySynStreamFrame;
import io.netty.handler.codec.spdy.SpdyHeaders;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* A SPDY based implementation of {@link Request}
*/
class SpdyRequest extends SpdyOperation implements Request {
// TODO(user): Inject this
private static final String HOST_NAME;
static {
String hostName;
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException uhe) {
hostName = "localhost";
}
HOST_NAME = hostName;
}
private static DefaultSpdySynStreamFrame createHeadersFrame(int id, String operationName) {
DefaultSpdySynStreamFrame headersFrame = new DefaultSpdySynStreamFrame(id, 0, (byte) 0);
headersFrame.headers().add(SpdyHeaders.HttpNames.METHOD, "POST");
// TODO(user) Convert operation names to URIs
headersFrame.headers().add(SpdyHeaders.HttpNames.PATH, "/" + operationName);
headersFrame.headers().add(SpdyHeaders.HttpNames.VERSION, "HTTP/1.1");
headersFrame.headers().add(SpdyHeaders.HttpNames.HOST, HOST_NAME);
headersFrame.headers().add(SpdyHeaders.HttpNames.SCHEME, "https");
headersFrame.headers().add("content-type", SpdySession.PROTORPC);
return headersFrame;
}
private final Response response;
public SpdyRequest(Response response, Channel channel, String operationName,
Framer framer) {
super(createHeadersFrame(response.getId(), operationName), channel, framer);
this.response = response;
}
@Override
public Response getResponse() {
return response;
}
}

View File

@ -1,41 +0,0 @@
package com.google.net.stubby.spdy.netty;
import com.google.net.stubby.Response;
import com.google.net.stubby.transport.Framer;
import io.netty.channel.Channel;
import io.netty.handler.codec.spdy.DefaultSpdySynReplyFrame;
import io.netty.handler.codec.spdy.SpdyHeaders;
/**
* A SPDY based implementation of a {@link Response}.
*/
class SpdyResponse extends SpdyOperation implements Response {
public static ResponseBuilder builder(final int id, final Channel channel,
final Framer framer) {
return new ResponseBuilder() {
@Override
public Response build(int id) {
throw new UnsupportedOperationException();
}
@Override
public Response build() {
return new SpdyResponse(id, channel, framer);
}
};
}
public static DefaultSpdySynReplyFrame createSynReply(int id) {
DefaultSpdySynReplyFrame synReplyFrame = new DefaultSpdySynReplyFrame(id);
// TODO(user): Need to review status code handling
synReplyFrame.headers().add(SpdyHeaders.HttpNames.STATUS, "200");
return synReplyFrame;
}
private SpdyResponse(int id, Channel channel, Framer framer) {
super(createSynReply(id), channel, framer);
}
}

View File

@ -1,75 +0,0 @@
package com.google.net.stubby.spdy.netty;
import com.google.net.stubby.RequestRegistry;
import com.google.net.stubby.Session;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.spdy.SpdyFrameCodec;
import io.netty.handler.codec.spdy.SpdyVersion;
/**
* Simple server connection startup that attaches a {@link Session} implementation to
* a connection.
*/
public class SpdyServer implements Runnable {
private final int port;
private final Session session;
private final RequestRegistry operations;
private Channel channel;
public SpdyServer(int port, Session session, RequestRegistry operations) {
this.port = port;
this.session = session;
this.operations = operations;
}
@Override
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
// TODO(user): Evaluate use of pooled allocator
b.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new SpdyFrameCodec(SpdyVersion.SPDY_3_1),
new SpdyCodec(session, operations));
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and startContext to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
channel = f.channel();
channel.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public void stop() throws Exception {
if (channel != null) {
channel.close().get();
}
}
}

View File

@ -1,46 +0,0 @@
package com.google.net.stubby.spdy.netty;
import com.google.net.stubby.Request;
import com.google.net.stubby.RequestRegistry;
import com.google.net.stubby.Response;
import com.google.net.stubby.Session;
import com.google.net.stubby.transport.MessageFramer;
import io.netty.channel.Channel;
import java.util.concurrent.atomic.AtomicInteger;
/**
* An implementation of {@link Session} that can be used by clients to start
* a {@link Request}
*/
public class SpdySession implements Session {
public static final String PROTORPC = "application/protorpc";
private final Channel channel;
private final boolean clientSession;
private final RequestRegistry requestRegistry;
private AtomicInteger sessionId;
public SpdySession(Channel channel, RequestRegistry requestRegistry) {
this.channel = channel;
this.clientSession = true;
this.requestRegistry = requestRegistry;
// Clients are odd numbers starting at 1, servers are even numbers stating at 2
sessionId = new AtomicInteger(1);
}
private int getNextStreamId() {
return (sessionId.getAndIncrement() * 2) + (clientSession ? -1 : 0);
}
@Override
public Request startRequest(String operationName, Response.ResponseBuilder response) {
int nextSessionId = getNextStreamId();
Request operation = new SpdyRequest(response.build(nextSessionId), channel, operationName,
new MessageFramer(4096));
requestRegistry.register(operation);
return operation;
}
}

View File

@ -15,7 +15,7 @@ import java.util.logging.Logger;
import java.util.zip.Deflater;
/**
* Compression framer for SPDY and HTTP/2 transport frames, for use in both compression and
* Compression framer for HTTP/2 transport frames, for use in both compression and
* non-compression scenarios. Receives message-stream as input. It is able to change compression
* configuration on-the-fly, but will not actually begin using the new configuration until the next
* full frame.