core: InternalSubchannel: the new TransportSet. (#2427)

This is the first step of a major refactor for the LoadBalancer-related part of Channel impl (#1600). It forks TransportSet into InternalSubchannel and makes changes on it.

What's changed:

- InternalSubchannel no longer has delayed transport, thus will not buffer
  streams when a READY real transport is absent.
- When there isn't a READ real transport, obtainActiveTransport() will
  return null.
- InternalSubchannel is no longer a ManagedChannel
- Overhauled Callback interface, with onStateChange() replacing the
  adhoc transport event methods.
- Call out channelExecutor, which is a serializing executor that runs
  the Callback.

The first commit is an unmodified copy of the files that are being forked. Please review the second commit which changes on the forked files.
This commit is contained in:
Kun Zhang 2016-11-22 22:32:27 -08:00 committed by GitHub
parent 325c965fdc
commit 6a04022d7a
7 changed files with 1478 additions and 6 deletions

View File

@ -0,0 +1,111 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import com.google.common.base.Preconditions;
import io.grpc.Status;
/**
* A tuple of a {@link ConnectivityState} and its associated {@link Status}.
*
* <p>If the state is {@code TRANSIENT_FAILURE}, the status is never {@code OK}. For other states,
* the status is always {@code OK}.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
public final class ConnectivityStateInfo {
private final ConnectivityState state;
private final Status status;
/**
* Returns an instance for a state that is not {@code TRANSIENT_FAILURE}.
*
* @throws IllegalArgumentException if {@code state} is {@code TRANSIENT_FAILURE}.
*/
public static ConnectivityStateInfo forNonError(ConnectivityState state) {
Preconditions.checkArgument(state != ConnectivityState.TRANSIENT_FAILURE,
"state is TRANSIENT_ERROR. Use forError() instead");
return new ConnectivityStateInfo(state, Status.OK);
}
/**
* Returns an instance for {@code TRANSIENT_FAILURE}, associated with an error status.
*/
public static ConnectivityStateInfo forTransientFailure(Status error) {
Preconditions.checkArgument(!error.isOk(), "The error status must not be OK");
return new ConnectivityStateInfo(ConnectivityState.TRANSIENT_FAILURE, error);
}
/**
* Returns the state.
*/
public ConnectivityState getState() {
return state;
}
/**
* Returns the status associated with the state.
*
* <p>If the state is {@code TRANSIENT_FAILURE}, the status is never {@code OK}. For other
* states, the status is always {@code OK}.
*/
public Status getStatus() {
return status;
}
@Override
public boolean equals(Object other) {
if (!(other instanceof ConnectivityStateInfo)) {
return false;
}
ConnectivityStateInfo o = (ConnectivityStateInfo) other;
return state.equals(o.state) && status.equals(o.status);
}
@Override
public int hashCode() {
return state.hashCode() ^ status.hashCode();
}
@Override
public String toString() {
if (status.isOk()) {
return state.toString();
}
return state + "(" + status + ")";
}
private ConnectivityStateInfo(ConnectivityState state, Status status) {
this.state = Preconditions.checkNotNull(state, "state is null");
this.status = Preconditions.checkNotNull(status, "status is null");
}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import java.util.HashSet;
import javax.annotation.concurrent.NotThreadSafe;
/**
* Aggregates the in-use state of a set of objects.
*/
@NotThreadSafe
abstract class InUseStateAggregator2<T> {
private final HashSet<T> inUseObjects = new HashSet<T>();
/**
* Update the in-use state of an object. Initially no object is in use.
*
* <p>This may call into {@link #handleInUse} or {@link #handleNotInUse} when appropriate.
*/
final void updateObjectInUse(T object, boolean inUse) {
int origSize = inUseObjects.size();
if (inUse) {
inUseObjects.add(object);
if (origSize == 0) {
handleInUse();
}
} else {
boolean removed = inUseObjects.remove(object);
if (removed && origSize == 1) {
handleNotInUse();
}
}
}
final boolean isInUse() {
return !inUseObjects.isEmpty();
}
/**
* Called when the aggregated in-use state has changed to true, which means at least one object is
* in use.
*/
abstract void handleInUse();
/**
* Called when the aggregated in-use state has changed to false, which means no object is in use.
*
* <p>This method is called under the lock returned by {@link #getLock}.
*/
abstract void handleNotInUse();
}

View File

@ -0,0 +1,492 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.Status;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
/**
* Transports for a single {@link SocketAddress}.
*
* <p>This is the next version of {@link TransportSet} in development.
*/
@ThreadSafe
final class InternalSubchannel implements WithLogId {
private static final Logger log = Logger.getLogger(InternalSubchannel.class.getName());
private final Object lock = new Object();
private final EquivalentAddressGroup addressGroup;
private final String authority;
private final String userAgent;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Callback callback;
private final ClientTransportFactory transportFactory;
private final ScheduledExecutorService scheduledExecutor;
// A serializing executor shared across the Channel
//
// TODO(zhangkun83): decide the type of Channel Executor. I considered a SerializingExecutor
// based on the app executor, but it seems abusive because the app executor is intended for app
// logic, not for channel bookkeeping. We don't want channel bookkeeping logic to contend for
// threads with app logic, which may increase latency or even cause starvation. Instead, we
// should consider a thread-less Executor after the refactor of ManagedChannelImpl is done.
//
// NOTE: there are cases where channelExecutor.execute() is run under "lock". This will add risk
// of deadlock if channelExecutor is based on a direct executor. Thread-less executor wouldn't
// have such problem.
private final Executor channelExecutor;
@GuardedBy("lock")
private int nextAddressIndex;
/**
* The policy to control back off between reconnects. Non-{@code null} when last connect failed.
*/
@GuardedBy("lock")
private BackoffPolicy reconnectPolicy;
/**
* Timer monitoring duration since entering CONNECTING state.
*/
@GuardedBy("lock")
private final Stopwatch connectingTimer;
@GuardedBy("lock")
@Nullable
private ScheduledFuture<?> reconnectTask;
/**
* All transports that are not terminated. At the very least the value of {@link #activeTransport}
* will be present, but previously used transports that still have streams or are stopping may
* also be present.
*/
@GuardedBy("lock")
private final Collection<ManagedClientTransport> transports =
new ArrayList<ManagedClientTransport>();
// Must only be used from channelExecutor
private final InUseStateAggregator2<ManagedClientTransport> inUseStateAggregator =
new InUseStateAggregator2<ManagedClientTransport>() {
@Override
void handleInUse() {
callback.onInUse(InternalSubchannel.this);
}
@Override
void handleNotInUse() {
callback.onNotInUse(InternalSubchannel.this);
}
};
/**
* The to-be active transport, which is not ready yet.
*/
@GuardedBy("lock")
@Nullable
private ConnectionClientTransport pendingTransport;
/**
* The transport for new outgoing requests. 'lock' must be held when assigning to it. Non-null
* only in READY state.
*/
@Nullable
private volatile ManagedClientTransport activeTransport;
@GuardedBy("lock")
private ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE);
InternalSubchannel(EquivalentAddressGroup addressGroup, String authority, String userAgent,
BackoffPolicy.Provider backoffPolicyProvider,
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
Supplier<Stopwatch> stopwatchSupplier, Executor channelExecutor, Callback callback) {
this.addressGroup = Preconditions.checkNotNull(addressGroup, "addressGroup");
this.authority = authority;
this.userAgent = userAgent;
this.backoffPolicyProvider = backoffPolicyProvider;
this.transportFactory = transportFactory;
this.scheduledExecutor = scheduledExecutor;
this.connectingTimer = stopwatchSupplier.get();
this.channelExecutor = channelExecutor;
this.callback = callback;
}
/**
* Returns a READY transport that will be used to create new streams.
*
* <p>Returns {@code null} if the state is not READY.
*/
@Nullable
final ClientTransport obtainActiveTransport() {
ClientTransport savedTransport = activeTransport;
if (savedTransport != null) {
return savedTransport;
}
Runnable runnable = null;
synchronized (lock) {
savedTransport = activeTransport;
// Check again, since it could have changed before acquiring the lock
if (savedTransport != null) {
return savedTransport;
}
if (state.getState() == IDLE) {
gotoNonErrorState(CONNECTING);
runnable = startNewTransport();
}
}
if (runnable != null) {
runnable.run();
}
return null;
}
@CheckReturnValue
@GuardedBy("lock")
private Runnable startNewTransport() {
Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
if (nextAddressIndex == 0) {
connectingTimer.reset().start();
}
List<SocketAddress> addrs = addressGroup.getAddresses();
final SocketAddress address = addrs.get(nextAddressIndex++);
if (nextAddressIndex >= addrs.size()) {
nextAddressIndex = 0;
}
ConnectionClientTransport transport =
transportFactory.newClientTransport(address, authority, userAgent);
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Created {1} for {2}",
new Object[] {getLogId(), transport.getLogId(), address});
}
pendingTransport = transport;
transports.add(transport);
return transport.start(new TransportListener(transport, address));
}
/**
* Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
* @param status the causal status when the channel begins transition to
* TRANSIENT_FAILURE.
*/
@GuardedBy("lock")
private void scheduleBackoff(final Status status) {
class EndOfCurrentBackoff implements Runnable {
@Override
public void run() {
try {
Runnable runnable = null;
synchronized (lock) {
reconnectTask = null;
if (state.getState() == SHUTDOWN) {
// Even though shutdown() will cancel this task, the task may have already started
// when it's being cancelled.
return;
}
gotoNonErrorState(CONNECTING);
runnable = startNewTransport();
}
if (runnable != null) {
runnable.run();
}
} catch (Throwable t) {
log.log(Level.WARNING, "Exception handling end of backoff", t);
}
}
}
gotoState(ConnectivityStateInfo.forTransientFailure(status));
if (reconnectPolicy == null) {
reconnectPolicy = backoffPolicyProvider.get();
}
long delayMillis =
reconnectPolicy.nextBackoffMillis() - connectingTimer.elapsed(TimeUnit.MILLISECONDS);
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ms",
new Object[]{getLogId(), delayMillis});
}
Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done");
reconnectTask = scheduledExecutor.schedule(
new LogExceptionRunnable(new EndOfCurrentBackoff()),
delayMillis,
TimeUnit.MILLISECONDS);
}
@GuardedBy("lock")
private void gotoNonErrorState(ConnectivityState newState) {
gotoState(ConnectivityStateInfo.forNonError(newState));
}
@GuardedBy("lock")
private void gotoState(final ConnectivityStateInfo newState) {
if (state.getState() != newState.getState()) {
Preconditions.checkState(state.getState() != SHUTDOWN,
"Cannot transition out of SHUTDOWN to " + newState);
state = newState;
channelExecutor.execute(new Runnable() {
@Override
public void run() {
callback.onStateChange(InternalSubchannel.this, newState);
}
});
}
}
public void shutdown() {
ManagedClientTransport savedActiveTransport;
ConnectionClientTransport savedPendingTransport;
synchronized (lock) {
if (state.getState() == SHUTDOWN) {
return;
}
gotoNonErrorState(SHUTDOWN);
savedActiveTransport = activeTransport;
savedPendingTransport = pendingTransport;
activeTransport = null;
pendingTransport = null;
if (transports.isEmpty()) {
handleTermination();
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Terminated in shutdown()", getLogId());
}
} // else: the callback will be run once all transports have been terminated
cancelReconnectTask();
}
if (savedActiveTransport != null) {
savedActiveTransport.shutdown();
}
if (savedPendingTransport != null) {
savedPendingTransport.shutdown();
}
}
// May be called under lock.
private void handleTermination() {
channelExecutor.execute(new Runnable() {
@Override
public void run() {
callback.onTerminated(InternalSubchannel.this);
}
});
}
private void handleTransportInUseState(
final ManagedClientTransport transport, final boolean inUse) {
channelExecutor.execute(new Runnable() {
@Override
public void run() {
inUseStateAggregator.updateObjectInUse(transport, inUse);
}
});
}
void shutdownNow(Status reason) {
shutdown();
Collection<ManagedClientTransport> transportsCopy;
synchronized (lock) {
transportsCopy = new ArrayList<ManagedClientTransport>(transports);
}
for (ManagedClientTransport transport : transportsCopy) {
transport.shutdownNow(reason);
}
}
@GuardedBy("lock")
private void cancelReconnectTask() {
if (reconnectTask != null) {
reconnectTask.cancel(false);
reconnectTask = null;
}
}
@Override
public String getLogId() {
return GrpcUtil.getLogId(this);
}
@VisibleForTesting
ConnectivityState getState() {
synchronized (lock) {
return state.getState();
}
}
/** Listener for real transports. */
private class TransportListener implements ManagedClientTransport.Listener {
final ManagedClientTransport transport;
final SocketAddress address;
TransportListener(ManagedClientTransport transport, SocketAddress address) {
this.transport = transport;
this.address = address;
}
@Override
public void transportReady() {
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] {1} for {2} is ready",
new Object[] {getLogId(), transport.getLogId(), address});
}
ConnectivityState savedState;
synchronized (lock) {
savedState = state.getState();
reconnectPolicy = null;
nextAddressIndex = 0;
if (savedState == SHUTDOWN) {
// activeTransport should have already been set to null by shutdown(). We keep it null.
Preconditions.checkState(activeTransport == null,
"Unexpected non-null activeTransport");
} else if (pendingTransport == transport) {
gotoNonErrorState(READY);
activeTransport = transport;
pendingTransport = null;
}
}
if (savedState == SHUTDOWN) {
transport.shutdown();
}
}
@Override
public void transportInUse(boolean inUse) {
handleTransportInUseState(transport, inUse);
}
@Override
public void transportShutdown(Status s) {
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] {1} for {2} is being shutdown with status {3}",
new Object[] {getLogId(), transport.getLogId(), address, s});
}
Runnable runnable = null;
synchronized (lock) {
if (state.getState() == SHUTDOWN) {
return;
}
if (activeTransport == transport) {
gotoNonErrorState(IDLE);
activeTransport = null;
} else if (pendingTransport == transport) {
Preconditions.checkState(state.getState() == CONNECTING,
"Expected state is CONNECTING, actual state is %s", state.getState());
// Continue reconnect if there are still addresses to try.
if (nextAddressIndex == 0) {
// Initiate backoff
// Transition to TRANSIENT_FAILURE
scheduleBackoff(s);
} else {
runnable = startNewTransport();
}
}
}
if (runnable != null) {
runnable.run();
}
}
@Override
public void transportTerminated() {
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] {1} for {2} is terminated",
new Object[] {getLogId(), transport.getLogId(), address});
}
boolean terminated = false;
handleTransportInUseState(transport, false);
synchronized (lock) {
transports.remove(transport);
if (state.getState() == SHUTDOWN && transports.isEmpty()) {
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Terminated in transportTerminated()", getLogId());
}
handleTermination();
}
}
Preconditions.checkState(activeTransport != transport,
"activeTransport still points to this transport. "
+ "Seems transportShutdown() was not called.");
}
}
// All methods are called in channelExecutor, which is a serializing executor.
abstract static class Callback {
/**
* Called when the subchannel is terminated, which means it's shut down and all transports
* have been terminated.
*/
public void onTerminated(InternalSubchannel is) { }
/**
* Called when the subchannel's connectivity state has changed.
*/
public void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { }
/**
* Called when the subchannel's in-use state has changed to true, which means at least one
* transport is in use.
*/
public void onInUse(InternalSubchannel is) { }
/**
* Called when the subchannel's in-use state has changed to false, which means no transport is
* in use.
*/
public void onNotInUse(InternalSubchannel is) { }
}
}

View File

@ -0,0 +1,93 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotSame;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Unit tests for {@link ConnectivityStateInfo}. */
@RunWith(JUnit4.class)
public class ConnectivityStateInfoTest {
@Test
public void forNonError() {
ConnectivityStateInfo info = ConnectivityStateInfo.forNonError(ConnectivityState.IDLE);
assertEquals(ConnectivityState.IDLE, info.getState());
assertEquals(Status.OK, info.getStatus());
}
@Test(expected = IllegalArgumentException.class)
public void forNonErrorInvalid() {
ConnectivityStateInfo.forNonError(ConnectivityState.TRANSIENT_FAILURE);
}
@Test
public void forTransientFailure() {
ConnectivityStateInfo info = ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE);
assertEquals(ConnectivityState.TRANSIENT_FAILURE, info.getState());
assertEquals(Status.UNAVAILABLE, info.getStatus());
}
@Test(expected = IllegalArgumentException.class)
public void forTransientFailureInvalid() {
ConnectivityStateInfo.forTransientFailure(Status.OK);
}
@Test
public void equality() {
ConnectivityStateInfo info1 = ConnectivityStateInfo.forNonError(ConnectivityState.IDLE);
ConnectivityStateInfo info2 = ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING);
ConnectivityStateInfo info3 = ConnectivityStateInfo.forNonError(ConnectivityState.IDLE);
ConnectivityStateInfo info4 = ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE);
ConnectivityStateInfo info5 = ConnectivityStateInfo.forTransientFailure(Status.INTERNAL);
ConnectivityStateInfo info6 = ConnectivityStateInfo.forTransientFailure(Status.INTERNAL);
assertEquals(info1, info3);
assertNotSame(info1, info3);
assertEquals(info1.hashCode(), info3.hashCode());
assertEquals(info5, info6);
assertEquals(info5.hashCode(), info6.hashCode());
assertNotSame(info5, info6);
assertNotEquals(info1, info2);
assertNotEquals(info1, info4);
assertNotEquals(info4, info6);
assertFalse(info1.equals(null));
assertFalse(info1.equals(this));
}
}

View File

@ -76,9 +76,9 @@ public final class FakeClock {
private long currentTimeNanos;
private class ScheduledTask extends AbstractFuture<Void> implements ScheduledFuture<Void> {
final Runnable command;
final long dueTimeNanos;
public class ScheduledTask extends AbstractFuture<Void> implements ScheduledFuture<Void> {
public final Runnable command;
public final long dueTimeNanos;
ScheduledTask(long dueTimeNanos, Runnable command) {
this.dueTimeNanos = dueTimeNanos;

View File

@ -0,0 +1,688 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.Status;
import io.grpc.internal.TestUtils.MockClientTransportInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Unit tests for {@link InternalSubchannel}.
*
* <p>It only tests the logic that is not covered by {@link ManagedChannelImplTransportManagerTest}.
*/
@RunWith(JUnit4.class)
public class InternalSubchannelTest {
private static final String AUTHORITY = "fakeauthority";
private static final String USER_AGENT = "mosaic";
private static final ConnectivityStateInfo UNAVAILABLE_STATE =
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE);
private static final ConnectivityStateInfo RESOURCE_EXHAUSTED_STATE =
ConnectivityStateInfo.forTransientFailure(Status.RESOURCE_EXHAUSTED);
// For scheduled executor
private final FakeClock fakeClock = new FakeClock();
// For channelExecutor
private final FakeClock fakeExecutor = new FakeClock();
private final SerializingExecutor channelExecutor =
new SerializingExecutor(fakeExecutor.getScheduledExecutorService());
@Mock private BackoffPolicy mockBackoffPolicy1;
@Mock private BackoffPolicy mockBackoffPolicy2;
@Mock private BackoffPolicy mockBackoffPolicy3;
@Mock private BackoffPolicy.Provider mockBackoffPolicyProvider;
@Mock private ClientTransportFactory mockTransportFactory;
private final LinkedList<String> callbackInvokes = new LinkedList<String>();
private final InternalSubchannel.Callback mockInternalSubchannelCallback =
new InternalSubchannel.Callback() {
@Override
public void onTerminated(InternalSubchannel is) {
assertSame(internalSubchannel, is);
callbackInvokes.add("onTerminated");
}
@Override
public void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
assertSame(internalSubchannel, is);
callbackInvokes.add("onStateChange:" + newState);
}
@Override
public void onInUse(InternalSubchannel is) {
assertSame(internalSubchannel, is);
callbackInvokes.add("onInUse");
}
@Override
public void onNotInUse(InternalSubchannel is) {
assertSame(internalSubchannel, is);
callbackInvokes.add("onNotInUse");
}
};
private InternalSubchannel internalSubchannel;
private EquivalentAddressGroup addressGroup;
private BlockingQueue<MockClientTransportInfo> transports;
@Before public void setUp() {
MockitoAnnotations.initMocks(this);
when(mockBackoffPolicyProvider.get())
.thenReturn(mockBackoffPolicy1, mockBackoffPolicy2, mockBackoffPolicy3);
when(mockBackoffPolicy1.nextBackoffMillis()).thenReturn(10L, 100L);
when(mockBackoffPolicy2.nextBackoffMillis()).thenReturn(10L, 100L);
when(mockBackoffPolicy3.nextBackoffMillis()).thenReturn(10L, 100L);
transports = TestUtils.captureTransports(mockTransportFactory);
}
@After public void noMorePendingTasks() {
assertEquals(0, fakeClock.numPendingTasks());
assertEquals(0, fakeExecutor.numPendingTasks());
}
@Test public void singleAddressReconnect() {
SocketAddress addr = mock(SocketAddress.class);
createInternalSubchannel(addr);
assertEquals(ConnectivityState.IDLE, internalSubchannel.getState());
// Invocation counters
int transportsCreated = 0;
int backoff1Consulted = 0;
int backoff2Consulted = 0;
int backoffReset = 0;
// First attempt
assertEquals(ConnectivityState.IDLE, internalSubchannel.getState());
assertNoCallbackInvoke();
assertNull(internalSubchannel.obtainActiveTransport());
assertExactCallbackInvokes("onStateChange:CONNECTING");
assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport(addr, AUTHORITY, USER_AGENT);
// Fail this one. Because there is only one address to try, enter TRANSIENT_FAILURE.
assertNoCallbackInvoke();
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState());
assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
// Backoff reset and using first back-off value interval
verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis();
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
// Second attempt
// Transport creation doesn't happen until time is due
fakeClock.forwardMillis(9);
assertNull(internalSubchannel.obtainActiveTransport());
verify(mockTransportFactory, times(transportsCreated))
.newClientTransport(addr, AUTHORITY, USER_AGENT);
assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState());
assertNoCallbackInvoke();
fakeClock.forwardMillis(1);
assertExactCallbackInvokes("onStateChange:CONNECTING");
assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport(addr, AUTHORITY, USER_AGENT);
// Fail this one too
assertNoCallbackInvoke();
// Here we use a different status from the first failure, and verify that it's passed to
// the callback.
transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED);
assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState());
assertExactCallbackInvokes("onStateChange:" + RESOURCE_EXHAUSTED_STATE);
// Second back-off interval
verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
// Third attempt
// Transport creation doesn't happen until time is due
fakeClock.forwardMillis(99);
assertNull(internalSubchannel.obtainActiveTransport());
verify(mockTransportFactory, times(transportsCreated))
.newClientTransport(addr, AUTHORITY, USER_AGENT);
assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState());
assertNoCallbackInvoke();
fakeClock.forwardMillis(1);
assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState());
assertExactCallbackInvokes("onStateChange:CONNECTING");
assertNull(internalSubchannel.obtainActiveTransport());
verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport(addr, AUTHORITY, USER_AGENT);
// Let this one succeed, will enter READY state.
assertNoCallbackInvoke();
transports.peek().listener.transportReady();
assertExactCallbackInvokes("onStateChange:READY");
assertEquals(ConnectivityState.READY, internalSubchannel.getState());
assertSame(transports.peek().transport, internalSubchannel.obtainActiveTransport());
// Close the READY transport, will enter IDLE state.
assertNoCallbackInvoke();
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertEquals(ConnectivityState.IDLE, internalSubchannel.getState());
assertExactCallbackInvokes("onStateChange:IDLE");
// Back-off is reset, and the next attempt will happen immediately
assertNull(internalSubchannel.obtainActiveTransport());
assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState());
assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport(addr, AUTHORITY, USER_AGENT);
// Final checks for consultations on back-off policies
verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffMillis();
verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffMillis();
}
@Test public void twoAddressesReconnect() {
SocketAddress addr1 = mock(SocketAddress.class);
SocketAddress addr2 = mock(SocketAddress.class);
createInternalSubchannel(addr1, addr2);
assertEquals(ConnectivityState.IDLE, internalSubchannel.getState());
// Invocation counters
int transportsAddr1 = 0;
int transportsAddr2 = 0;
int backoff1Consulted = 0;
int backoff2Consulted = 0;
int backoff3Consulted = 0;
int backoffReset = 0;
// First attempt
assertNoCallbackInvoke();
assertNull(internalSubchannel.obtainActiveTransport());
assertExactCallbackInvokes("onStateChange:CONNECTING");
assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory, times(++transportsAddr1))
.newClientTransport(addr1, AUTHORITY, USER_AGENT);
// Let this one fail without success
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Still in CONNECTING
assertNull(internalSubchannel.obtainActiveTransport());
assertNoCallbackInvoke();
assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState());
// Second attempt will start immediately. Still no back-off policy.
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2))
.newClientTransport(addr2, AUTHORITY, USER_AGENT);
assertNull(internalSubchannel.obtainActiveTransport());
// Fail this one too
assertNoCallbackInvoke();
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// All addresses have failed. Delayed transport will be in back-off interval.
assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState());
assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
// Backoff reset and first back-off interval begins
verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis();
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
// No reconnect during TRANSIENT_FAILURE even when requested.
assertNull(internalSubchannel.obtainActiveTransport());
assertNoCallbackInvoke();
assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState());
// Third attempt is the first address, thus controlled by the first back-off interval.
fakeClock.forwardMillis(9);
verify(mockTransportFactory, times(transportsAddr1))
.newClientTransport(addr1, AUTHORITY, USER_AGENT);
assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState());
assertNoCallbackInvoke();
fakeClock.forwardMillis(1);
assertExactCallbackInvokes("onStateChange:CONNECTING");
assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory, times(++transportsAddr1))
.newClientTransport(addr1, AUTHORITY, USER_AGENT);
// Fail this one too
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState());
// Forth attempt will start immediately. Keep back-off policy.
assertNull(internalSubchannel.obtainActiveTransport());
assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState());
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2))
.newClientTransport(addr2, AUTHORITY, USER_AGENT);
// Fail this one too
assertNoCallbackInvoke();
transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED);
// All addresses have failed again. Delayed transport will be in back-off interval.
assertExactCallbackInvokes("onStateChange:" + RESOURCE_EXHAUSTED_STATE);
assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState());
// Second back-off interval begins
verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
// Fifth attempt for the first address, thus controlled by the second back-off interval.
assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState());
fakeClock.forwardMillis(99);
verify(mockTransportFactory, times(transportsAddr1))
.newClientTransport(addr1, AUTHORITY, USER_AGENT);
assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState());
assertNoCallbackInvoke();
fakeClock.forwardMillis(1);
assertExactCallbackInvokes("onStateChange:CONNECTING");
assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory, times(++transportsAddr1))
.newClientTransport(addr1, AUTHORITY, USER_AGENT);
// Let it through
assertNoCallbackInvoke();
transports.peek().listener.transportReady();
assertExactCallbackInvokes("onStateChange:READY");
assertEquals(ConnectivityState.READY, internalSubchannel.getState());
assertSame(transports.peek().transport, internalSubchannel.obtainActiveTransport());
// Then close it.
assertNoCallbackInvoke();
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertExactCallbackInvokes("onStateChange:IDLE");
assertEquals(ConnectivityState.IDLE, internalSubchannel.getState());
// First attempt after a successful connection. Old back-off policy should be ignored, but there
// is not yet a need for a new one. Start from the first address.
assertNull(internalSubchannel.obtainActiveTransport());
assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState());
assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1))
.newClientTransport(addr1, AUTHORITY, USER_AGENT);
// Fail the transport
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState());
// Second attempt will start immediately. Still no new back-off policy.
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2))
.newClientTransport(addr2, AUTHORITY, USER_AGENT);
// Fail this one too
assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState());
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// All addresses have failed. Enter TRANSIENT_FAILURE. Back-off in effect.
assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState());
// Back-off reset and first back-off interval begins
verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffMillis();
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
// Third attempt is the first address, thus controlled by the first back-off interval.
fakeClock.forwardMillis(9);
verify(mockTransportFactory, times(transportsAddr1))
.newClientTransport(addr1, AUTHORITY, USER_AGENT);
assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState());
assertNoCallbackInvoke();
fakeClock.forwardMillis(1);
assertExactCallbackInvokes("onStateChange:CONNECTING");
assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory, times(++transportsAddr1))
.newClientTransport(addr1, AUTHORITY, USER_AGENT);
// Final checks on invocations on back-off policies
verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffMillis();
verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffMillis();
verify(mockBackoffPolicy3, times(backoff3Consulted)).nextBackoffMillis();
}
@Test
public void connectIsLazy() {
SocketAddress addr = mock(SocketAddress.class);
createInternalSubchannel(addr);
// Invocation counters
int transportsCreated = 0;
// Won't connect until requested
verify(mockTransportFactory, times(transportsCreated))
.newClientTransport(addr, AUTHORITY, USER_AGENT);
// First attempt
internalSubchannel.obtainActiveTransport();
assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport(addr, AUTHORITY, USER_AGENT);
// Fail this one
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
// Will always reconnect after back-off
fakeClock.forwardMillis(10);
assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport(addr, AUTHORITY, USER_AGENT);
// Make this one proceed
transports.peek().listener.transportReady();
assertExactCallbackInvokes("onStateChange:READY");
// Then go-away
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertExactCallbackInvokes("onStateChange:IDLE");
// No scheduled tasks that would ever try to reconnect ...
assertEquals(0, fakeClock.numPendingTasks());
assertEquals(0, fakeExecutor.numPendingTasks());
// ... until it's requested.
internalSubchannel.obtainActiveTransport();
assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport(addr, AUTHORITY, USER_AGENT);
}
@Test
public void shutdownWhenReady() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
createInternalSubchannel(addr);
internalSubchannel.obtainActiveTransport();
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY");
internalSubchannel.shutdown();
verify(transportInfo.transport).shutdown();
assertExactCallbackInvokes("onStateChange:SHUTDOWN");
transportInfo.listener.transportTerminated();
assertExactCallbackInvokes("onTerminated");
verify(transportInfo.transport, never()).shutdownNow(any(Status.class));
}
@Test
public void shutdownBeforeTransportCreated() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
createInternalSubchannel(addr);
// First transport is created immediately
internalSubchannel.obtainActiveTransport();
assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockTransportFactory).newClientTransport(addr, AUTHORITY, USER_AGENT);
// Fail this one
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
transportInfo.listener.transportTerminated();
// Entering TRANSIENT_FAILURE, waiting for back-off
assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
// Save the reconnectTask before shutting down
FakeClock.ScheduledTask reconnectTask = null;
for (FakeClock.ScheduledTask task : fakeClock.getPendingTasks()) {
if (task.command.toString().contains("EndOfCurrentBackoff")) {
assertNull("There shouldn't be more than one reconnectTask", reconnectTask);
assertFalse(task.isDone());
reconnectTask = task;
}
}
assertNotNull("There should be at least one reconnectTask", reconnectTask);
// Shut down InternalSubchannel before the transport is created.
internalSubchannel.shutdown();
assertTrue(reconnectTask.isCancelled());
// InternalSubchannel terminated promptly.
assertExactCallbackInvokes("onStateChange:SHUTDOWN", "onTerminated");
// Simulate a race between reconnectTask cancellation and execution -- the task runs anyway.
// This should not lead to the creation of a new transport.
reconnectTask.command.run();
// Futher call to obtainActiveTransport() is no-op.
assertNull(internalSubchannel.obtainActiveTransport());
assertEquals(ConnectivityState.SHUTDOWN, internalSubchannel.getState());
assertNoCallbackInvoke();
// No more transports will be created.
fakeClock.forwardMillis(10000);
assertEquals(ConnectivityState.SHUTDOWN, internalSubchannel.getState());
verifyNoMoreInteractions(mockTransportFactory);
assertEquals(0, transports.size());
assertNoCallbackInvoke();
}
@Test
public void shutdownBeforeTransportReady() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
createInternalSubchannel(addr);
internalSubchannel.obtainActiveTransport();
assertExactCallbackInvokes("onStateChange:CONNECTING");
MockClientTransportInfo transportInfo = transports.poll();
// Shutdown the InternalSubchannel before the pending transport is ready
assertNull(internalSubchannel.obtainActiveTransport());
internalSubchannel.shutdown();
assertExactCallbackInvokes("onStateChange:SHUTDOWN");
// The transport should've been shut down even though it's not the active transport yet.
verify(transportInfo.transport).shutdown();
transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
assertNoCallbackInvoke();
transportInfo.listener.transportTerminated();
assertExactCallbackInvokes("onTerminated");
assertEquals(ConnectivityState.SHUTDOWN, internalSubchannel.getState());
}
@Test
public void shutdownNow() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
createInternalSubchannel(addr);
internalSubchannel.obtainActiveTransport();
MockClientTransportInfo t1 = transports.poll();
t1.listener.transportReady();
assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY");
t1.listener.transportShutdown(Status.UNAVAILABLE);
assertExactCallbackInvokes("onStateChange:IDLE");
internalSubchannel.obtainActiveTransport();
assertExactCallbackInvokes("onStateChange:CONNECTING");
MockClientTransportInfo t2 = transports.poll();
Status status = Status.UNAVAILABLE.withDescription("Requested");
internalSubchannel.shutdownNow(status);
verify(t1.transport).shutdownNow(same(status));
verify(t2.transport).shutdownNow(same(status));
assertExactCallbackInvokes("onStateChange:SHUTDOWN");
}
@Test
public void obtainTransportAfterShutdown() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
createInternalSubchannel(addr);
internalSubchannel.shutdown();
assertExactCallbackInvokes("onStateChange:SHUTDOWN", "onTerminated");
assertEquals(ConnectivityState.SHUTDOWN, internalSubchannel.getState());
assertNull(internalSubchannel.obtainActiveTransport());
verify(mockTransportFactory, times(0)).newClientTransport(addr, AUTHORITY, USER_AGENT);
assertNoCallbackInvoke();
assertEquals(ConnectivityState.SHUTDOWN, internalSubchannel.getState());
}
@Test
public void logId() {
createInternalSubchannel(mock(SocketAddress.class));
assertEquals("InternalSubchannel@" + Integer.toHexString(internalSubchannel.hashCode()),
internalSubchannel.getLogId());
}
@Test
public void inUseState() {
SocketAddress addr = mock(SocketAddress.class);
createInternalSubchannel(addr);
internalSubchannel.obtainActiveTransport();
MockClientTransportInfo t0 = transports.poll();
t0.listener.transportReady();
assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY");
t0.listener.transportInUse(true);
assertExactCallbackInvokes("onInUse");
t0.listener.transportInUse(false);
assertExactCallbackInvokes("onNotInUse");
t0.listener.transportInUse(true);
assertExactCallbackInvokes("onInUse");
t0.listener.transportShutdown(Status.UNAVAILABLE);
assertExactCallbackInvokes("onStateChange:IDLE");
assertNull(internalSubchannel.obtainActiveTransport());
MockClientTransportInfo t1 = transports.poll();
t1.listener.transportReady();
assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY");
t1.listener.transportInUse(true);
// InternalSubchannel is already in-use, thus doesn't call the callback
assertNoCallbackInvoke();
t1.listener.transportInUse(false);
// t0 is still in-use
assertNoCallbackInvoke();
t0.listener.transportInUse(false);
assertExactCallbackInvokes("onNotInUse");
}
@Test
public void transportTerminateWithoutExitingInUse() {
// An imperfect transport that terminates without going out of in-use. InternalSubchannel will
// clear the in-use bit for it.
SocketAddress addr = mock(SocketAddress.class);
createInternalSubchannel(addr);
internalSubchannel.obtainActiveTransport();
MockClientTransportInfo t0 = transports.poll();
t0.listener.transportReady();
assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY");
t0.listener.transportInUse(true);
assertExactCallbackInvokes("onInUse");
t0.listener.transportShutdown(Status.UNAVAILABLE);
assertExactCallbackInvokes("onStateChange:IDLE");
t0.listener.transportTerminated();
assertExactCallbackInvokes("onNotInUse");
}
@Test
public void transportStartReturnsRunnable() {
SocketAddress addr1 = mock(SocketAddress.class);
SocketAddress addr2 = mock(SocketAddress.class);
createInternalSubchannel(addr1, addr2);
final AtomicInteger runnableInvokes = new AtomicInteger(0);
Runnable startRunnable = new Runnable() {
@Override
public void run() {
runnableInvokes.incrementAndGet();
}
};
transports = TestUtils.captureTransports(mockTransportFactory, startRunnable);
assertEquals(0, runnableInvokes.get());
internalSubchannel.obtainActiveTransport();
assertEquals(1, runnableInvokes.get());
internalSubchannel.obtainActiveTransport();
assertEquals(1, runnableInvokes.get());
MockClientTransportInfo t0 = transports.poll();
t0.listener.transportShutdown(Status.UNAVAILABLE);
assertEquals(2, runnableInvokes.get());
// 2nd address: reconnect immediatly
MockClientTransportInfo t1 = transports.poll();
t1.listener.transportShutdown(Status.UNAVAILABLE);
// Addresses exhausted, waiting for back-off.
assertEquals(2, runnableInvokes.get());
// Run out the back-off period
fakeClock.forwardMillis(10);
assertEquals(3, runnableInvokes.get());
// This test doesn't care about scheduled InternalSubchannel callbacks. Clear it up so that
// noMorePendingTasks() won't fail.
fakeExecutor.runDueTasks();
assertEquals(3, runnableInvokes.get());
}
private void createInternalSubchannel(SocketAddress ... addrs) {
addressGroup = new EquivalentAddressGroup(Arrays.asList(addrs));
internalSubchannel = new InternalSubchannel(addressGroup, AUTHORITY, USER_AGENT,
mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(),
fakeClock.getStopwatchSupplier(), channelExecutor, mockInternalSubchannelCallback);
}
private void assertNoCallbackInvoke() {
while (fakeExecutor.runDueTasks() > 0) {}
assertEquals(0, callbackInvokes.size());
}
private void assertExactCallbackInvokes(String ... expectedInvokes) {
// Make sure all callbacks are to run from channelExecutor only.
assertEquals(0, callbackInvokes.size());
while (fakeExecutor.runDueTasks() > 0) {}
assertEquals(Arrays.asList(expectedInvokes), callbackInvokes);
callbackInvokes.clear();
}
}

View File

@ -46,6 +46,7 @@ import org.mockito.stubbing.Answer;
import java.net.SocketAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;
/**
* Common utility methods for tests.
@ -78,6 +79,11 @@ final class TestUtils {
*/
static BlockingQueue<MockClientTransportInfo> captureTransports(
ClientTransportFactory mockTransportFactory) {
return captureTransports(mockTransportFactory, null);
}
static BlockingQueue<MockClientTransportInfo> captureTransports(
ClientTransportFactory mockTransportFactory, @Nullable final Runnable startRunnable) {
final BlockingQueue<MockClientTransportInfo> captor =
new LinkedBlockingQueue<MockClientTransportInfo>();
@ -89,12 +95,12 @@ final class TestUtils {
any(CallOptions.class), any(StatsTraceContext.class)))
.thenReturn(mock(ClientStream.class));
// Save the listener
doAnswer(new Answer<Void>() {
doAnswer(new Answer<Runnable>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
public Runnable answer(InvocationOnMock invocation) throws Throwable {
captor.add(new MockClientTransportInfo(
mockTransport, (ManagedClientTransport.Listener) invocation.getArguments()[0]));
return null;
return startRunnable;
}
}).when(mockTransport).start(any(ManagedClientTransport.Listener.class));
return mockTransport;