diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 9ed519139a..7164a165d0 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -79,6 +79,8 @@ class NettyServer implements InternalServer { private final int maxHeaderListSize; private final long keepAliveTimeInNanos; private final long keepAliveTimeoutInNanos; + private final long maxConnectionAgeInNanos; + private final long maxConnectionAgeGraceInNanos; private final boolean permitKeepAliveWithoutCalls; private final long permitKeepAliveTimeInNanos; private final ReferenceCounted eventLoopReferenceCounter = new EventLoopReferenceCounter(); @@ -90,6 +92,7 @@ class NettyServer implements InternalServer { ProtocolNegotiator protocolNegotiator, List streamTracerFactories, int maxStreamsPerConnection, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, + long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos) { this.address = address; this.channelType = checkNotNull(channelType, "channelType"); @@ -105,6 +108,8 @@ class NettyServer implements InternalServer { this.maxHeaderListSize = maxHeaderListSize; this.keepAliveTimeInNanos = keepAliveTimeInNanos; this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; + this.maxConnectionAgeInNanos = maxConnectionAgeInNanos; + this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos; this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls; this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos; } @@ -141,8 +146,10 @@ class NettyServer implements InternalServer { NettyServerTransport transport = new NettyServerTransport( ch, protocolNegotiator, streamTracerFactories, maxStreamsPerConnection, - flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, - keepAliveTimeoutInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); + flowControlWindow, maxMessageSize, maxHeaderListSize, + keepAliveTimeInNanos, keepAliveTimeoutInNanos, + maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, + permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); ServerTransportListener transportListener; // This is to order callbacks on the listener, not to guard access to channel. synchronized (NettyServer.this) { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index 7d161a93a7..b6f47537f8 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -65,8 +65,12 @@ import javax.net.ssl.SSLException; public final class NettyServerBuilder extends AbstractServerImplBuilder { public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB + static final long MAX_CONNECTION_AGE_NANOS_DISABLED = Long.MAX_VALUE; + static final long MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE = Long.MAX_VALUE; + private static final long MIN_KEEPALIVE_TIME_NANO = TimeUnit.MILLISECONDS.toNanos(1L); private static final long MIN_KEEPALIVE_TIMEOUT_NANO = TimeUnit.MICROSECONDS.toNanos(499L); + private static final long MIN_MAX_CONNECTION_AGE_NANO = TimeUnit.SECONDS.toNanos(1L); private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L); private final SocketAddress address; @@ -83,6 +87,8 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder 0L, "max connection age must be positive"); + maxConnectionAgeInNanos = timeUnit.toNanos(maxConnectionAge); + if (maxConnectionAgeInNanos >= AS_LARGE_AS_INFINITE) { + maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED; + } + if (maxConnectionAgeInNanos < MIN_MAX_CONNECTION_AGE_NANO) { + maxConnectionAgeInNanos = MIN_MAX_CONNECTION_AGE_NANO; + } + return this; + } + + /** + * Sets a custom grace time for the graceful connection termination. Once the max connection age + * is reached, RPCs have the grace time to complete. RPCs that do not complete in time will be + * cancelled, allowing the connection to terminate. {@code Long.MAX_VALUE} nano seconds or an + * unreasonably large value are considered infinite. + * + * @see #maxConnectionAge(long, TimeUnit) + * @since 1.3.0 + */ + public NettyServerBuilder maxConnectionAgeGrace(long maxConnectionAgeGrace, TimeUnit timeUnit) { + checkArgument(maxConnectionAgeGrace >= 0L, "max connection age grace must be non-negative"); + maxConnectionAgeGraceInNanos = timeUnit.toNanos(maxConnectionAgeGrace); + if (maxConnectionAgeGraceInNanos >= AS_LARGE_AS_INFINITE) { + maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE; + } + return this; + } + /** * Specify the most aggressive keep-alive time clients are permitted to configure. The server will * try to detect clients exceeding this rate and when detected will forcefully close the @@ -322,6 +365,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder streamTracerFactories; private final KeepAliveEnforcer keepAliveEnforcer; private Attributes attributes; @@ -114,6 +119,7 @@ class NettyServerHandler extends AbstractNettyHandler { private WriteQueue serverWriteQueue; private AsciiString lastKnownAuthority; private KeepAliveManager keepAliveManager; + private ScheduledFuture maxConnectionAgeMonitor; static NettyServerHandler newHandler( ServerTransportListener transportListener, @@ -124,6 +130,8 @@ class NettyServerHandler extends AbstractNettyHandler { int maxMessageSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, + long maxConnectionAgeInNanos, + long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos) { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); @@ -133,9 +141,12 @@ class NettyServerHandler extends AbstractNettyHandler { new DefaultHttp2FrameReader(headersDecoder), frameLogger); Http2FrameWriter frameWriter = new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger); - return newHandler(frameReader, frameWriter, transportListener, streamTracerFactories, - maxStreams, flowControlWindow, maxHeaderListSize, maxMessageSize, keepAliveTimeInNanos, - keepAliveTimeoutInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); + return newHandler( + frameReader, frameWriter, transportListener, streamTracerFactories, + maxStreams, flowControlWindow, maxHeaderListSize, maxMessageSize, + keepAliveTimeInNanos, keepAliveTimeoutInNanos, + maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, + permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); } @VisibleForTesting @@ -149,6 +160,8 @@ class NettyServerHandler extends AbstractNettyHandler { int maxMessageSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, + long maxConnectionAgeInNanos, + long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos) { Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive"); @@ -191,8 +204,11 @@ class NettyServerHandler extends AbstractNettyHandler { settings.maxConcurrentStreams(maxStreams); settings.maxHeaderListSize(maxHeaderListSize); - return new NettyServerHandler(transportListener, streamTracerFactories, decoder, encoder, - settings, maxMessageSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos, keepAliveEnforcer); + return new NettyServerHandler( + transportListener, streamTracerFactories, decoder, encoder, settings, maxMessageSize, + keepAliveTimeInNanos, keepAliveTimeoutInNanos, + maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, + keepAliveEnforcer); } private NettyServerHandler( @@ -203,12 +219,16 @@ class NettyServerHandler extends AbstractNettyHandler { int maxMessageSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, + long maxConnectionAgeInNanos, + long maxConnectionAgeGraceInNanos, KeepAliveEnforcer keepAliveEnforcer) { super(decoder, encoder, settings); checkArgument(maxMessageSize >= 0, "maxMessageSize must be >= 0"); this.maxMessageSize = maxMessageSize; this.keepAliveTimeInNanos = keepAliveTimeInNanos; this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; + this.maxConnectionAgeInNanos = maxConnectionAgeInNanos; + this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos; this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer"); streamKey = encoder.connection().newKey(); @@ -225,8 +245,41 @@ class NettyServerHandler extends AbstractNettyHandler { } @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { serverWriteQueue = new WriteQueue(ctx.channel()); + + // init max connection age monitor + if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) { + maxConnectionAgeMonitor = ctx.executor().schedule( + new LogExceptionRunnable(new Runnable() { + @Override + public void run() { + // send GO_AWAY + ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "max_age"); + goAway( + ctx, + Integer.MAX_VALUE, + Http2Error.NO_ERROR.code(), + debugData, + ctx.newPromise()); + + // gracefully shutdown with specified grace time + long savedGracefulShutdownTime = gracefulShutdownTimeoutMillis(); + try { + gracefulShutdownTimeoutMillis( + TimeUnit.NANOSECONDS.toMillis(maxConnectionAgeGraceInNanos)); + close(ctx, ctx.newPromise()); + } catch (Exception e) { + onError(ctx, e); + } finally { + gracefulShutdownTimeoutMillis(savedGracefulShutdownTime); + } + } + }), + maxConnectionAgeInNanos, + TimeUnit.NANOSECONDS); + } + if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) { keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(), keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */); @@ -361,6 +414,9 @@ class NettyServerHandler extends AbstractNettyHandler { if (keepAliveManager != null) { keepAliveManager.onTransportTermination(); } + if (maxConnectionAgeMonitor != null) { + maxConnectionAgeMonitor.cancel(false); + } final Status status = Status.UNAVAILABLE.withDescription("connection terminated for unknown reason"); // Any streams that are still active must be closed diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 853fba9f78..13e67b3e75 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -62,15 +62,19 @@ class NettyServerTransport implements ServerTransport { private final int maxHeaderListSize; private final long keepAliveTimeInNanos; private final long keepAliveTimeoutInNanos; + private final long maxConnectionAgeInNanos; + private final long maxConnectionAgeGraceInNanos; private final boolean permitKeepAliveWithoutCalls; private final long permitKeepAliveTimeInNanos; private final List streamTracerFactories; - NettyServerTransport(Channel channel, ProtocolNegotiator protocolNegotiator, + NettyServerTransport( + Channel channel, ProtocolNegotiator protocolNegotiator, List streamTracerFactories, int maxStreams, - int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeInNanos, - long keepAliveTimeoutInNanos, boolean permitKeepAliveWithoutCalls, - long permitKeepAliveTimeInNanos) { + int flowControlWindow, int maxMessageSize, int maxHeaderListSize, + long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, + long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, + boolean permitKeepAliveWithoutCalls,long permitKeepAliveTimeInNanos) { this.channel = Preconditions.checkNotNull(channel, "channel"); this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator"); this.streamTracerFactories = @@ -81,6 +85,8 @@ class NettyServerTransport implements ServerTransport { this.maxHeaderListSize = maxHeaderListSize; this.keepAliveTimeInNanos = keepAliveTimeInNanos; this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; + this.maxConnectionAgeInNanos = maxConnectionAgeInNanos; + this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos; this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls; this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos; } @@ -145,8 +151,11 @@ class NettyServerTransport implements ServerTransport { * Creates the Netty handler to be used in the channel pipeline. */ private NettyServerHandler createHandler(ServerTransportListener transportListener) { - return NettyServerHandler.newHandler(transportListener, streamTracerFactories, maxStreams, - flowControlWindow, maxHeaderListSize, maxMessageSize, keepAliveTimeInNanos, - keepAliveTimeoutInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); + return NettyServerHandler.newHandler( + transportListener, streamTracerFactories, maxStreams, + flowControlWindow, maxHeaderListSize, maxMessageSize, + keepAliveTimeInNanos, keepAliveTimeoutInNanos, + maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, + permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); } } diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 0085714fff..1537989391 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -38,6 +38,8 @@ import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS; import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS; import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED; import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY; +import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE; +import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -482,7 +484,8 @@ public class NettyClientTransportTest { NioServerSocketChannel.class, group, group, negotiator, Collections.emptyList(), maxStreamsPerConnection, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize, - DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS, true, 0); + DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS, + MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0); server.start(serverListener); address = TestUtils.testServerAddress(server.getPort()); authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 33060113e1..de226d80e3 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -33,6 +33,8 @@ package io.grpc.netty; import static com.google.common.base.Charsets.UTF_8; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; +import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE; +import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED; import static io.grpc.netty.Utils.CONTENT_TYPE_GRPC; import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER; import static io.grpc.netty.Utils.HTTP_METHOD; @@ -122,6 +124,8 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase