mirror of https://github.com/grpc/grpc-java.git
core: Support keepalive even when transport is idle
Nothing is using this yet, but it will be used on both client and server.
This commit is contained in:
parent
6d44f2ffa4
commit
19afd8b48b
|
@ -49,6 +49,7 @@ public class KeepAliveManager {
|
|||
private final ScheduledExecutorService scheduler;
|
||||
private final ManagedClientTransport transport;
|
||||
private final Ticker ticker;
|
||||
private final boolean keepAliveDuringTransportIdle;
|
||||
private State state = State.IDLE;
|
||||
private long nextKeepaliveTime;
|
||||
private ScheduledFuture<?> shutdownFuture;
|
||||
|
@ -101,7 +102,8 @@ public class KeepAliveManager {
|
|||
|
||||
private enum State {
|
||||
/*
|
||||
* Transport has no active rpcs. We don't need to do any keepalives.
|
||||
* We don't need to do any keepalives. This means the transport has no active rpcs and
|
||||
* keepAliveDuringTransportIdle == false.
|
||||
*/
|
||||
IDLE,
|
||||
/*
|
||||
|
@ -131,27 +133,34 @@ public class KeepAliveManager {
|
|||
* Creates a KeepAliverManager.
|
||||
*/
|
||||
public KeepAliveManager(ManagedClientTransport transport, ScheduledExecutorService scheduler,
|
||||
long keepAliveDelayInNanos, long keepAliveTimeoutInNanos) {
|
||||
this.transport = Preconditions.checkNotNull(transport, "transport");
|
||||
this.scheduler = Preconditions.checkNotNull(scheduler, "scheduler");
|
||||
this.ticker = SYSTEM_TICKER;
|
||||
// Set a minimum cap on keepalive dealy.
|
||||
this.keepAliveDelayInNanos = Math.max(MIN_KEEPALIVE_DELAY_NANOS, keepAliveDelayInNanos);
|
||||
this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
|
||||
nextKeepaliveTime = ticker.read() + keepAliveDelayInNanos;
|
||||
long keepAliveDelayInNanos, long keepAliveTimeoutInNanos,
|
||||
boolean keepAliveDuringTransportIdle) {
|
||||
this(transport, scheduler, SYSTEM_TICKER,
|
||||
// Set a minimum cap on keepalive dealy.
|
||||
Math.max(MIN_KEEPALIVE_DELAY_NANOS, keepAliveDelayInNanos), keepAliveTimeoutInNanos,
|
||||
keepAliveDuringTransportIdle);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
KeepAliveManager(ManagedClientTransport transport, ScheduledExecutorService scheduler,
|
||||
Ticker ticker, long keepAliveDelayInNanos, long keepAliveTimeoutInNanos) {
|
||||
Ticker ticker, long keepAliveDelayInNanos, long keepAliveTimeoutInNanos,
|
||||
boolean keepAliveDuringTransportIdle) {
|
||||
this.transport = Preconditions.checkNotNull(transport, "transport");
|
||||
this.scheduler = Preconditions.checkNotNull(scheduler, "scheduler");
|
||||
this.ticker = Preconditions.checkNotNull(ticker, "ticker");
|
||||
this.keepAliveDelayInNanos = keepAliveDelayInNanos;
|
||||
this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
|
||||
this.keepAliveDuringTransportIdle = keepAliveDuringTransportIdle;
|
||||
nextKeepaliveTime = ticker.read() + keepAliveDelayInNanos;
|
||||
}
|
||||
|
||||
/** Start keepalive monitoring. */
|
||||
public synchronized void onTransportStarted() {
|
||||
if (keepAliveDuringTransportIdle) {
|
||||
onTransportActive();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transport has received some data so that we can delay sending keepalives.
|
||||
*/
|
||||
|
@ -192,13 +201,16 @@ public class KeepAliveManager {
|
|||
TimeUnit.NANOSECONDS);
|
||||
} else if (state == State.IDLE_AND_PING_SENT) {
|
||||
state = State.PING_SENT;
|
||||
}
|
||||
} // Other states are possible when keepAliveDuringTransportIdle == true
|
||||
}
|
||||
|
||||
/**
|
||||
* Transport has finished all streams.
|
||||
*/
|
||||
public synchronized void onTransportIdle() {
|
||||
if (keepAliveDuringTransportIdle) {
|
||||
return;
|
||||
}
|
||||
if (state == State.PING_SCHEDULED || state == State.PING_DELAYED) {
|
||||
state = State.IDLE;
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@
|
|||
package io.grpc.internal;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -70,7 +71,8 @@ public final class KeepAliveManagerTest {
|
|||
@Before
|
||||
public void setUp() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
keepAliveManager = new KeepAliveManager(transport, scheduler, ticker, 1000, 2000);
|
||||
keepAliveManager = new KeepAliveManager(transport, scheduler, ticker, 1000, 2000, false);
|
||||
keepAliveManager.onTransportStarted();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -261,6 +263,33 @@ public final class KeepAliveManagerTest {
|
|||
verify(scheduler, times(1)).schedule(isA(Runnable.class), isA(Long.class), isA(TimeUnit.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void transportGoesIdle_doesntCauseIdleWhenEnabled() {
|
||||
keepAliveManager.onTransportShutdown();
|
||||
keepAliveManager = new KeepAliveManager(transport, scheduler, ticker, 1000, 2000, true);
|
||||
keepAliveManager.onTransportStarted();
|
||||
|
||||
// Keepalive scheduling should have started immediately.
|
||||
ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
|
||||
verify(scheduler).schedule(runnableCaptor.capture(), isA(Long.class),
|
||||
isA(TimeUnit.class));
|
||||
Runnable sendPing = runnableCaptor.getValue();
|
||||
|
||||
keepAliveManager.onTransportActive();
|
||||
|
||||
// Transport becomes idle. Should not impact the sending of the ping.
|
||||
keepAliveManager.onTransportIdle();
|
||||
sendPing.run();
|
||||
// Ping was sent.
|
||||
verify(transport).ping(isA(ClientTransport.PingCallback.class), isA(Executor.class));
|
||||
// Shutdown is scheduled.
|
||||
verify(scheduler, times(2)).schedule(runnableCaptor.capture(), isA(Long.class),
|
||||
isA(TimeUnit.class));
|
||||
// Shutdown is triggered.
|
||||
runnableCaptor.getValue().run();
|
||||
verify(transport).shutdownNow(any(Status.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void transportGoesIdleAfterPingSent() {
|
||||
// Transport becomes active. We should schedule keepalive pings.
|
||||
|
|
|
@ -232,7 +232,8 @@ class NettyClientTransport implements ConnectionClientTransport {
|
|||
|
||||
if (enableKeepAlive) {
|
||||
keepAliveManager = new KeepAliveManager(this, channel.eventLoop(), keepAliveDelayNanos,
|
||||
keepAliveTimeoutNanos);
|
||||
keepAliveTimeoutNanos, false);
|
||||
keepAliveManager.onTransportStarted();
|
||||
}
|
||||
|
||||
return null;
|
||||
|
|
|
@ -370,7 +370,8 @@ class OkHttpClientTransport implements ConnectionClientTransport {
|
|||
if (enableKeepAlive) {
|
||||
scheduler = SharedResourceHolder.get(TIMER_SERVICE);
|
||||
keepAliveManager = new KeepAliveManager(this, scheduler, keepAliveDelayNanos,
|
||||
keepAliveTimeoutNanos);
|
||||
keepAliveTimeoutNanos, false);
|
||||
keepAliveManager.onTransportStarted();
|
||||
}
|
||||
|
||||
frameWriter = new AsyncFrameWriter(this, serializingExecutor);
|
||||
|
|
Loading…
Reference in New Issue