diff --git a/core/src/main/java/io/grpc/internal/KeepAliveManager.java b/core/src/main/java/io/grpc/internal/KeepAliveManager.java index 087a7ca53e..5413cd92fe 100644 --- a/core/src/main/java/io/grpc/internal/KeepAliveManager.java +++ b/core/src/main/java/io/grpc/internal/KeepAliveManager.java @@ -49,6 +49,7 @@ public class KeepAliveManager { private final ScheduledExecutorService scheduler; private final ManagedClientTransport transport; private final Ticker ticker; + private final boolean keepAliveDuringTransportIdle; private State state = State.IDLE; private long nextKeepaliveTime; private ScheduledFuture shutdownFuture; @@ -101,7 +102,8 @@ public class KeepAliveManager { private enum State { /* - * Transport has no active rpcs. We don't need to do any keepalives. + * We don't need to do any keepalives. This means the transport has no active rpcs and + * keepAliveDuringTransportIdle == false. */ IDLE, /* @@ -131,27 +133,34 @@ public class KeepAliveManager { * Creates a KeepAliverManager. */ public KeepAliveManager(ManagedClientTransport transport, ScheduledExecutorService scheduler, - long keepAliveDelayInNanos, long keepAliveTimeoutInNanos) { - this.transport = Preconditions.checkNotNull(transport, "transport"); - this.scheduler = Preconditions.checkNotNull(scheduler, "scheduler"); - this.ticker = SYSTEM_TICKER; - // Set a minimum cap on keepalive dealy. - this.keepAliveDelayInNanos = Math.max(MIN_KEEPALIVE_DELAY_NANOS, keepAliveDelayInNanos); - this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; - nextKeepaliveTime = ticker.read() + keepAliveDelayInNanos; + long keepAliveDelayInNanos, long keepAliveTimeoutInNanos, + boolean keepAliveDuringTransportIdle) { + this(transport, scheduler, SYSTEM_TICKER, + // Set a minimum cap on keepalive dealy. + Math.max(MIN_KEEPALIVE_DELAY_NANOS, keepAliveDelayInNanos), keepAliveTimeoutInNanos, + keepAliveDuringTransportIdle); } @VisibleForTesting KeepAliveManager(ManagedClientTransport transport, ScheduledExecutorService scheduler, - Ticker ticker, long keepAliveDelayInNanos, long keepAliveTimeoutInNanos) { + Ticker ticker, long keepAliveDelayInNanos, long keepAliveTimeoutInNanos, + boolean keepAliveDuringTransportIdle) { this.transport = Preconditions.checkNotNull(transport, "transport"); this.scheduler = Preconditions.checkNotNull(scheduler, "scheduler"); this.ticker = Preconditions.checkNotNull(ticker, "ticker"); this.keepAliveDelayInNanos = keepAliveDelayInNanos; this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; + this.keepAliveDuringTransportIdle = keepAliveDuringTransportIdle; nextKeepaliveTime = ticker.read() + keepAliveDelayInNanos; } + /** Start keepalive monitoring. */ + public synchronized void onTransportStarted() { + if (keepAliveDuringTransportIdle) { + onTransportActive(); + } + } + /** * Transport has received some data so that we can delay sending keepalives. */ @@ -192,13 +201,16 @@ public class KeepAliveManager { TimeUnit.NANOSECONDS); } else if (state == State.IDLE_AND_PING_SENT) { state = State.PING_SENT; - } + } // Other states are possible when keepAliveDuringTransportIdle == true } /** * Transport has finished all streams. */ public synchronized void onTransportIdle() { + if (keepAliveDuringTransportIdle) { + return; + } if (state == State.PING_SCHEDULED || state == State.PING_DELAYED) { state = State.IDLE; } diff --git a/core/src/test/java/io/grpc/internal/KeepAliveManagerTest.java b/core/src/test/java/io/grpc/internal/KeepAliveManagerTest.java index ee575e18f7..14830c7db4 100644 --- a/core/src/test/java/io/grpc/internal/KeepAliveManagerTest.java +++ b/core/src/test/java/io/grpc/internal/KeepAliveManagerTest.java @@ -32,6 +32,7 @@ package io.grpc.internal; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -70,7 +71,8 @@ public final class KeepAliveManagerTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - keepAliveManager = new KeepAliveManager(transport, scheduler, ticker, 1000, 2000); + keepAliveManager = new KeepAliveManager(transport, scheduler, ticker, 1000, 2000, false); + keepAliveManager.onTransportStarted(); } @Test @@ -261,6 +263,33 @@ public final class KeepAliveManagerTest { verify(scheduler, times(1)).schedule(isA(Runnable.class), isA(Long.class), isA(TimeUnit.class)); } + @Test + public void transportGoesIdle_doesntCauseIdleWhenEnabled() { + keepAliveManager.onTransportShutdown(); + keepAliveManager = new KeepAliveManager(transport, scheduler, ticker, 1000, 2000, true); + keepAliveManager.onTransportStarted(); + + // Keepalive scheduling should have started immediately. + ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(scheduler).schedule(runnableCaptor.capture(), isA(Long.class), + isA(TimeUnit.class)); + Runnable sendPing = runnableCaptor.getValue(); + + keepAliveManager.onTransportActive(); + + // Transport becomes idle. Should not impact the sending of the ping. + keepAliveManager.onTransportIdle(); + sendPing.run(); + // Ping was sent. + verify(transport).ping(isA(ClientTransport.PingCallback.class), isA(Executor.class)); + // Shutdown is scheduled. + verify(scheduler, times(2)).schedule(runnableCaptor.capture(), isA(Long.class), + isA(TimeUnit.class)); + // Shutdown is triggered. + runnableCaptor.getValue().run(); + verify(transport).shutdownNow(any(Status.class)); + } + @Test public void transportGoesIdleAfterPingSent() { // Transport becomes active. We should schedule keepalive pings. diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 0be1e1f509..0633f39fb0 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -232,7 +232,8 @@ class NettyClientTransport implements ConnectionClientTransport { if (enableKeepAlive) { keepAliveManager = new KeepAliveManager(this, channel.eventLoop(), keepAliveDelayNanos, - keepAliveTimeoutNanos); + keepAliveTimeoutNanos, false); + keepAliveManager.onTransportStarted(); } return null; diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 225216a497..d9bc926844 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -370,7 +370,8 @@ class OkHttpClientTransport implements ConnectionClientTransport { if (enableKeepAlive) { scheduler = SharedResourceHolder.get(TIMER_SERVICE); keepAliveManager = new KeepAliveManager(this, scheduler, keepAliveDelayNanos, - keepAliveTimeoutNanos); + keepAliveTimeoutNanos, false); + keepAliveManager.onTransportStarted(); } frameWriter = new AsyncFrameWriter(this, serializingExecutor);