mirror of https://github.com/grpc/grpc-java.git
core, grpclb, xds: let leaf LB policies explicitly refresh name resolution when subchannel connection is broken (#8048)
Currently each subchannel implicitly refreshes the name resolution when its state changes to IDLE or TRANSIENT_FAILURE. That is, this feature is built into subchannel's internal implementation. Although it eliminates the burden of having LB implementations refreshing the resolver when connections to backends are broken, this is gives LB policies no chance to disable or override this refresh (e.g., in some complex load balancing hierarchy like xDS, LB policies may embed a resolver inside for resolving backends so the refreshing resolution operation should be hooked to the resolver embedded in the LB policy instead of the one in Channel). In order to make this transition smoothly, we add a check to SubchannelImpl that checks if the LoadBalancer has explicitly called Helper.refreshNameResolution for broken subchannels created by it. If not, it logs a warning and do the refresh. A temporary LoadBalancer.Helper API ignoreRefreshNameResolution() is added to avoid false-positive warnings for xDS that intentionally does not want a refresh. Once the migration is done, this should be deleted.
This commit is contained in:
parent
384f4c401d
commit
9614738a7d
|
@ -1059,6 +1059,26 @@ public abstract class LoadBalancer {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
/**
|
||||
* Historically the channel automatically refreshes name resolution if any subchannel
|
||||
* connection is broken. It's transitioning to let load balancers make the decision. To
|
||||
* avoid silent breakages, the channel checks if {@link #refreshNameResolution} is called
|
||||
* by the load balancer. If not, it will do it and log a warning. This will be removed in
|
||||
* the future and load balancers are completely responsible for triggering the refresh.
|
||||
* See <a href="https://github.com/grpc/grpc-java/issues/8088">#8088</a> for the background.
|
||||
*
|
||||
* <p>This should rarely be used, but sometimes the address for the subchannel wasn't
|
||||
* provided by the name resolver and a refresh needs to be directed somewhere else instead.
|
||||
* Then you can call this method to disable the short-tem check for detecting LoadBalancers
|
||||
* that need to be updated for the new expected behavior.
|
||||
*
|
||||
* @since 1.38.0
|
||||
*/
|
||||
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8088")
|
||||
public void ignoreRefreshNameResolutionCheck() {
|
||||
// no-op
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link SynchronizationContext} that runs tasks in the same Synchronization Context
|
||||
* as that the callback methods on the {@link LoadBalancer} interface are run in.
|
||||
|
|
|
@ -117,6 +117,7 @@ import javax.annotation.concurrent.ThreadSafe;
|
|||
@ThreadSafe
|
||||
final class ManagedChannelImpl extends ManagedChannel implements
|
||||
InternalInstrumented<ChannelStats> {
|
||||
@VisibleForTesting
|
||||
static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
|
||||
|
||||
// Matching this pattern means the target string is a URI target or at least intended to be one.
|
||||
|
@ -1440,6 +1441,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
|
||||
private final class LbHelperImpl extends LoadBalancer.Helper {
|
||||
AutoConfiguredLoadBalancer lb;
|
||||
boolean nsRefreshedByLb;
|
||||
boolean ignoreRefreshNsCheck;
|
||||
|
||||
@Override
|
||||
public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
|
||||
|
@ -1478,6 +1481,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
@Override
|
||||
public void refreshNameResolution() {
|
||||
syncContext.throwIfNotInThisSynchronizationContext();
|
||||
nsRefreshedByLb = true;
|
||||
final class LoadBalancerRefreshNameResolution implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -1488,6 +1492,11 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
syncContext.execute(new LoadBalancerRefreshNameResolution());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void ignoreRefreshNameResolutionCheck() {
|
||||
ignoreRefreshNsCheck = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
|
||||
return createOobChannel(Collections.singletonList(addressGroup), authority);
|
||||
|
@ -1530,6 +1539,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
|
||||
@Override
|
||||
void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
|
||||
// TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
|
||||
// state and refresh name resolution if necessary.
|
||||
handleInternalSubchannelState(newState);
|
||||
oobChannel.handleSubchannelStateChange(newState);
|
||||
}
|
||||
|
@ -1951,9 +1962,18 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
|
||||
@Override
|
||||
void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
|
||||
handleInternalSubchannelState(newState);
|
||||
checkState(listener != null, "listener is null");
|
||||
listener.onSubchannelState(newState);
|
||||
if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
|
||||
if (!helper.ignoreRefreshNsCheck && !helper.nsRefreshedByLb) {
|
||||
logger.log(Level.WARNING,
|
||||
"LoadBalancer should call Helper.refreshNameResolution() to refresh name "
|
||||
+ "resolution if subchannel state becomes TRANSIENT_FAILURE or IDLE. "
|
||||
+ "This will no longer happen automatically in the future releases");
|
||||
refreshAndResetNameResolution();
|
||||
helper.nsRefreshedByLb = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,6 +18,7 @@ package io.grpc.internal;
|
|||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static io.grpc.ConnectivityState.CONNECTING;
|
||||
import static io.grpc.ConnectivityState.IDLE;
|
||||
import static io.grpc.ConnectivityState.SHUTDOWN;
|
||||
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
||||
|
||||
|
@ -84,6 +85,9 @@ final class PickFirstLoadBalancer extends LoadBalancer {
|
|||
if (currentState == SHUTDOWN) {
|
||||
return;
|
||||
}
|
||||
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
|
||||
helper.refreshNameResolution();
|
||||
}
|
||||
|
||||
SubchannelPicker picker;
|
||||
switch (currentState) {
|
||||
|
|
|
@ -94,6 +94,11 @@ public abstract class ForwardingLoadBalancerHelper extends LoadBalancer.Helper {
|
|||
delegate().refreshNameResolution();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void ignoreRefreshNameResolutionCheck() {
|
||||
delegate().ignoreRefreshNameResolutionCheck();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAuthority() {
|
||||
return delegate().getAuthority();
|
||||
|
|
|
@ -139,6 +139,9 @@ final class RoundRobinLoadBalancer extends LoadBalancer {
|
|||
if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) {
|
||||
return;
|
||||
}
|
||||
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
|
||||
helper.refreshNameResolution();
|
||||
}
|
||||
if (stateInfo.getState() == IDLE) {
|
||||
subchannel.requestConnection();
|
||||
}
|
||||
|
|
|
@ -139,6 +139,9 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.logging.Handler;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.LogRecord;
|
||||
import javax.annotation.Nullable;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -277,6 +280,7 @@ public class ManagedChannelImplTest {
|
|||
private boolean requestConnection = true;
|
||||
private BlockingQueue<MockClientTransportInfo> transports;
|
||||
private boolean panicExpected;
|
||||
private final List<LogRecord> logs = new ArrayList<>();
|
||||
@Captor
|
||||
private ArgumentCaptor<ResolvedAddresses> resolvedAddressCaptor;
|
||||
|
||||
|
@ -328,6 +332,22 @@ public class ManagedChannelImplTest {
|
|||
when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
|
||||
when(balancerRpcExecutorPool.getObject())
|
||||
.thenReturn(balancerRpcExecutor.getScheduledExecutorService());
|
||||
Handler handler = new Handler() {
|
||||
@Override
|
||||
public void publish(LogRecord record) {
|
||||
logs.add(record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws SecurityException {
|
||||
}
|
||||
};
|
||||
ManagedChannelImpl.logger.addHandler(handler);
|
||||
ManagedChannelImpl.logger.setLevel(Level.ALL);
|
||||
|
||||
channelBuilder = new ManagedChannelImplBuilder(TARGET,
|
||||
new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT));
|
||||
|
@ -1539,6 +1559,103 @@ public class ManagedChannelImplTest {
|
|||
timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void subchannelConnectionBroken_noLbRefreshingResolver_logWarningAndTriggeRefresh() {
|
||||
FakeNameResolverFactory nameResolverFactory =
|
||||
new FakeNameResolverFactory.Builder(expectedUri)
|
||||
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
|
||||
.build();
|
||||
channelBuilder.nameResolverFactory(nameResolverFactory);
|
||||
createChannel();
|
||||
FakeNameResolverFactory.FakeNameResolver resolver =
|
||||
Iterables.getOnlyElement(nameResolverFactory.resolvers);
|
||||
assertThat(resolver.refreshCalled).isEqualTo(0);
|
||||
|
||||
Subchannel subchannel =
|
||||
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
|
||||
InternalSubchannel internalSubchannel =
|
||||
(InternalSubchannel) subchannel.getInternalSubchannel();
|
||||
internalSubchannel.obtainActiveTransport();
|
||||
MockClientTransportInfo transportInfo = transports.poll();
|
||||
|
||||
// Break subchannel connection
|
||||
transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unreachable"));
|
||||
LogRecord log = Iterables.getOnlyElement(logs);
|
||||
assertThat(log.getLevel()).isEqualTo(Level.WARNING);
|
||||
assertThat(log.getMessage()).isEqualTo(
|
||||
"LoadBalancer should call Helper.refreshNameResolution() to refresh name resolution if "
|
||||
+ "subchannel state becomes TRANSIENT_FAILURE or IDLE. This will no longer happen "
|
||||
+ "automatically in the future releases");
|
||||
assertThat(resolver.refreshCalled).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void subchannelConnectionBroken_ResolverRefreshedByLb() {
|
||||
FakeNameResolverFactory nameResolverFactory =
|
||||
new FakeNameResolverFactory.Builder(expectedUri)
|
||||
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
|
||||
.build();
|
||||
channelBuilder.nameResolverFactory(nameResolverFactory);
|
||||
createChannel();
|
||||
FakeNameResolverFactory.FakeNameResolver resolver =
|
||||
Iterables.getOnlyElement(nameResolverFactory.resolvers);
|
||||
assertThat(resolver.refreshCalled).isEqualTo(0);
|
||||
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
|
||||
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
|
||||
helper = helperCaptor.getValue();
|
||||
|
||||
SubchannelStateListener listener = new SubchannelStateListener() {
|
||||
@Override
|
||||
public void onSubchannelState(ConnectivityStateInfo newState) {
|
||||
// Normal LoadBalancer should refresh name resolution when some subchannel enters
|
||||
// TRANSIENT_FAILURE or IDLE
|
||||
if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
|
||||
helper.refreshNameResolution();
|
||||
}
|
||||
}
|
||||
};
|
||||
Subchannel subchannel =
|
||||
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, listener);
|
||||
InternalSubchannel internalSubchannel =
|
||||
(InternalSubchannel) subchannel.getInternalSubchannel();
|
||||
internalSubchannel.obtainActiveTransport();
|
||||
MockClientTransportInfo transportInfo = transports.poll();
|
||||
|
||||
// Break subchannel connection and simulate load balancer refreshing name resolution
|
||||
transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unreachable"));
|
||||
assertThat(logs).isEmpty();
|
||||
assertThat(resolver.refreshCalled).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void subchannelConnectionBroken_ignoreRefreshNameResolutionCheck_noRefresh() {
|
||||
FakeNameResolverFactory nameResolverFactory =
|
||||
new FakeNameResolverFactory.Builder(expectedUri)
|
||||
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
|
||||
.build();
|
||||
channelBuilder.nameResolverFactory(nameResolverFactory);
|
||||
createChannel();
|
||||
FakeNameResolverFactory.FakeNameResolver resolver =
|
||||
Iterables.getOnlyElement(nameResolverFactory.resolvers);
|
||||
assertThat(resolver.refreshCalled).isEqualTo(0);
|
||||
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
|
||||
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
|
||||
helper = helperCaptor.getValue();
|
||||
helper.ignoreRefreshNameResolutionCheck();
|
||||
|
||||
Subchannel subchannel =
|
||||
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
|
||||
InternalSubchannel internalSubchannel =
|
||||
(InternalSubchannel) subchannel.getInternalSubchannel();
|
||||
internalSubchannel.obtainActiveTransport();
|
||||
MockClientTransportInfo transportInfo = transports.poll();
|
||||
|
||||
// Break subchannel connection
|
||||
transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unreachable"));
|
||||
assertThat(logs).isEmpty();
|
||||
assertThat(resolver.refreshCalled).isEqualTo(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void subchannelStringableBeforeStart() {
|
||||
createChannel();
|
||||
|
@ -2095,43 +2212,26 @@ public class ManagedChannelImplTest {
|
|||
.isSameInstanceAs(NameResolverRegistry.getDefaultRegistry());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void refreshNameResolution_whenSubchannelConnectionFailed_notIdle() {
|
||||
subtestNameResolutionRefreshWhenConnectionFailed(false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void refreshNameResolution_whenOobChannelConnectionFailed_notIdle() {
|
||||
subtestNameResolutionRefreshWhenConnectionFailed(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void notRefreshNameResolution_whenSubchannelConnectionFailed_idle() {
|
||||
subtestNameResolutionRefreshWhenConnectionFailed(false, true);
|
||||
subtestNameResolutionRefreshWhenConnectionFailed(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void notRefreshNameResolution_whenOobChannelConnectionFailed_idle() {
|
||||
subtestNameResolutionRefreshWhenConnectionFailed(true, true);
|
||||
subtestNameResolutionRefreshWhenConnectionFailed(true);
|
||||
}
|
||||
|
||||
private void subtestNameResolutionRefreshWhenConnectionFailed(
|
||||
boolean isOobChannel, boolean isIdle) {
|
||||
private void subtestNameResolutionRefreshWhenConnectionFailed(boolean isIdle) {
|
||||
FakeNameResolverFactory nameResolverFactory =
|
||||
new FakeNameResolverFactory.Builder(expectedUri)
|
||||
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
|
||||
.build();
|
||||
channelBuilder.nameResolverFactory(nameResolverFactory);
|
||||
createChannel();
|
||||
if (isOobChannel) {
|
||||
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
|
||||
Collections.singletonList(addressGroup), "oobAuthority");
|
||||
oobChannel.getSubchannel().requestConnection();
|
||||
} else {
|
||||
Subchannel subchannel =
|
||||
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
|
||||
requestConnectionSafely(helper, subchannel);
|
||||
}
|
||||
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
|
||||
Collections.singletonList(addressGroup), "oobAuthority");
|
||||
oobChannel.getSubchannel().requestConnection();
|
||||
|
||||
MockClientTransportInfo transportInfo = transports.poll();
|
||||
assertNotNull(transportInfo);
|
||||
|
|
|
@ -22,6 +22,8 @@ import static io.grpc.ConnectivityState.IDLE;
|
|||
import static io.grpc.ConnectivityState.READY;
|
||||
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.isA;
|
||||
|
@ -160,6 +162,38 @@ public class PickFirstLoadBalancerTest {
|
|||
verify(mockSubchannel, times(2)).requestConnection();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void refreshNameResolutionAfterSubchannelConnectionBroken() {
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
|
||||
verify(mockHelper).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
|
||||
InOrder inOrder = inOrder(mockHelper, mockSubchannel);
|
||||
inOrder.verify(mockSubchannel).start(stateListenerCaptor.capture());
|
||||
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
|
||||
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
|
||||
assertSame(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
|
||||
inOrder.verify(mockSubchannel).requestConnection();
|
||||
|
||||
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
|
||||
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
|
||||
assertNull(pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
|
||||
Status error = Status.UNAUTHENTICATED.withDescription("permission denied");
|
||||
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
|
||||
inOrder.verify(mockHelper).refreshNameResolution();
|
||||
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
|
||||
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
|
||||
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
|
||||
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
|
||||
assertSame(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
|
||||
// Simulate receiving go-away so the subchannel transit to IDLE.
|
||||
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
|
||||
inOrder.verify(mockHelper).refreshNameResolution();
|
||||
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
|
||||
verifyNoMoreInteractions(mockHelper, mockSubchannel);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pickAfterResolvedAndUnchanged() throws Exception {
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
|
@ -225,10 +259,12 @@ public class PickFirstLoadBalancerTest {
|
|||
|
||||
Status error = Status.UNAVAILABLE.withDescription("boom!");
|
||||
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
|
||||
inOrder.verify(mockHelper).refreshNameResolution();
|
||||
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
|
||||
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
|
||||
|
||||
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
|
||||
inOrder.verify(mockHelper).refreshNameResolution();
|
||||
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
|
||||
assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
|
||||
|
||||
|
@ -294,6 +330,7 @@ public class PickFirstLoadBalancerTest {
|
|||
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
|
||||
|
||||
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
|
||||
inOrder.verify(mockHelper).refreshNameResolution();
|
||||
inOrder.verify(mockHelper).updateBalancingState(
|
||||
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
|
||||
|
||||
|
|
|
@ -277,11 +277,13 @@ public class RoundRobinLoadBalancerTest {
|
|||
ConnectivityStateInfo.forTransientFailure(error));
|
||||
assertThat(subchannelStateInfo.value.getState()).isEqualTo(TRANSIENT_FAILURE);
|
||||
assertThat(subchannelStateInfo.value.getStatus()).isEqualTo(error);
|
||||
inOrder.verify(mockHelper).refreshNameResolution();
|
||||
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
|
||||
assertThat(pickerCaptor.getValue()).isInstanceOf(EmptyPicker.class);
|
||||
|
||||
deliverSubchannelState(subchannel,
|
||||
ConnectivityStateInfo.forNonError(IDLE));
|
||||
inOrder.verify(mockHelper).refreshNameResolution();
|
||||
assertThat(subchannelStateInfo.value.getState()).isEqualTo(TRANSIENT_FAILURE);
|
||||
assertThat(subchannelStateInfo.value.getStatus()).isEqualTo(error);
|
||||
|
||||
|
@ -305,9 +307,7 @@ public class RoundRobinLoadBalancerTest {
|
|||
deliverSubchannelState(
|
||||
sc,
|
||||
ConnectivityStateInfo.forTransientFailure(error));
|
||||
deliverSubchannelState(
|
||||
sc,
|
||||
ConnectivityStateInfo.forNonError(IDLE));
|
||||
inOrder.verify(mockHelper).refreshNameResolution();
|
||||
deliverSubchannelState(
|
||||
sc,
|
||||
ConnectivityStateInfo.forNonError(CONNECTING));
|
||||
|
@ -330,6 +330,35 @@ public class RoundRobinLoadBalancerTest {
|
|||
verifyNoMoreInteractions(mockHelper);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void refreshNameResolutionWhenSubchannelConnectionBroken() {
|
||||
InOrder inOrder = inOrder(mockHelper);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(Attributes.EMPTY)
|
||||
.build());
|
||||
|
||||
verify(mockHelper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class));
|
||||
|
||||
// Simulate state transitions for each subchannel individually.
|
||||
for (Subchannel sc : loadBalancer.getSubchannels()) {
|
||||
verify(sc).requestConnection();
|
||||
deliverSubchannelState(sc, ConnectivityStateInfo.forNonError(CONNECTING));
|
||||
Status error = Status.UNKNOWN.withDescription("connection broken");
|
||||
deliverSubchannelState(sc, ConnectivityStateInfo.forTransientFailure(error));
|
||||
inOrder.verify(mockHelper).refreshNameResolution();
|
||||
deliverSubchannelState(sc, ConnectivityStateInfo.forNonError(READY));
|
||||
inOrder.verify(mockHelper).updateBalancingState(eq(READY), isA(ReadyPicker.class));
|
||||
// Simulate receiving go-away so READY subchannels transit to IDLE.
|
||||
deliverSubchannelState(sc, ConnectivityStateInfo.forNonError(IDLE));
|
||||
inOrder.verify(mockHelper).refreshNameResolution();
|
||||
verify(sc, times(2)).requestConnection();
|
||||
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class));
|
||||
}
|
||||
|
||||
verifyNoMoreInteractions(mockHelper);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pickerRoundRobin() throws Exception {
|
||||
Subchannel subchannel = mock(Subchannel.class);
|
||||
|
|
|
@ -222,6 +222,10 @@ final class GrpclbState {
|
|||
if (config.getMode() == Mode.ROUND_ROBIN && newState.getState() == IDLE) {
|
||||
subchannel.requestConnection();
|
||||
}
|
||||
if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
|
||||
helper.refreshNameResolution();
|
||||
}
|
||||
|
||||
AtomicReference<ConnectivityStateInfo> stateInfoRef =
|
||||
subchannel.getAttributes().get(STATE_INFO);
|
||||
// If all RR servers are unhealthy, it's possible that at least one connection is CONNECTING at
|
||||
|
|
|
@ -1003,6 +1003,7 @@ public class GrpclbLoadBalancerTest {
|
|||
verify(subchannel1).requestConnection();
|
||||
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE));
|
||||
verify(subchannel1, times(2)).requestConnection();
|
||||
inOrder.verify(helper).refreshNameResolution();
|
||||
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
|
||||
assertThat(logs).containsExactly(
|
||||
"INFO: [grpclb-<api.google.com>] Update balancing state to READY: picks="
|
||||
|
@ -1022,6 +1023,7 @@ public class GrpclbLoadBalancerTest {
|
|||
ConnectivityStateInfo errorState1 =
|
||||
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription("error1"));
|
||||
deliverSubchannelState(subchannel1, errorState1);
|
||||
inOrder.verify(helper).refreshNameResolution();
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
|
||||
// If no subchannel is READY, some with error and the others are IDLE, will report CONNECTING
|
||||
|
@ -1183,7 +1185,9 @@ public class GrpclbLoadBalancerTest {
|
|||
// Switch all subchannels to TRANSIENT_FAILURE, making the general state TRANSIENT_FAILURE too.
|
||||
Status error = Status.UNAVAILABLE.withDescription("error");
|
||||
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error));
|
||||
inOrder.verify(helper).refreshNameResolution();
|
||||
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forTransientFailure(error));
|
||||
inOrder.verify(helper).refreshNameResolution();
|
||||
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
|
||||
assertThat(((RoundRobinPicker) pickerCaptor.getValue()).pickList)
|
||||
.containsExactly(new ErrorEntry(error));
|
||||
|
@ -1191,6 +1195,7 @@ public class GrpclbLoadBalancerTest {
|
|||
// Switch subchannel1 to IDLE, then to CONNECTING, which are ignored since the previous
|
||||
// subchannel state is TRANSIENT_FAILURE. General state is unchanged.
|
||||
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE));
|
||||
inOrder.verify(helper).refreshNameResolution();
|
||||
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
|
||||
|
@ -1549,11 +1554,13 @@ public class GrpclbLoadBalancerTest {
|
|||
lbResponseObserver = lbResponseObserverCaptor.getValue();
|
||||
assertEquals(1, lbRequestObservers.size());
|
||||
lbRequestObserver = lbRequestObservers.poll();
|
||||
inOrder.verify(helper).refreshNameResolution();
|
||||
}
|
||||
if (allSubchannelsBroken) {
|
||||
for (Subchannel subchannel : subchannels) {
|
||||
// A READY subchannel transits to IDLE when receiving a go-away
|
||||
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
|
||||
inOrder.verify(helper).refreshNameResolution();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1566,6 +1573,7 @@ public class GrpclbLoadBalancerTest {
|
|||
// connections are lost
|
||||
for (Subchannel subchannel : subchannels) {
|
||||
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
|
||||
inOrder.verify(helper).refreshNameResolution();
|
||||
}
|
||||
|
||||
// Exit fallback mode or cancel fallback timer when receiving a new server list from balancer
|
||||
|
|
|
@ -211,6 +211,7 @@ public class RlsLoadBalancerTest {
|
|||
|
||||
// search subchannel is down, rescue subchannel is connecting
|
||||
searchSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));
|
||||
|
||||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
|
||||
|
||||
|
@ -474,6 +475,11 @@ public class RlsLoadBalancerTest {
|
|||
// no-op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshNameResolution() {
|
||||
// no-op
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAuthority() {
|
||||
return "fake-bigtable.googleapis.com";
|
||||
|
|
|
@ -279,8 +279,10 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
private final class RefreshableHelper extends ForwardingLoadBalancerHelper {
|
||||
private final Helper delegate;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private RefreshableHelper(Helper delegate) {
|
||||
this.delegate = checkNotNull(delegate, "delegate");
|
||||
delegate.ignoreRefreshNameResolutionCheck();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -203,6 +203,9 @@ final class RingHashLoadBalancer extends LoadBalancer {
|
|||
if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) {
|
||||
return;
|
||||
}
|
||||
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
|
||||
helper.refreshNameResolution();
|
||||
}
|
||||
Ref<ConnectivityStateInfo> subchannelStateRef = getSubchannelStateInfoRef(subchannel);
|
||||
|
||||
// Don't proactively reconnect if the subchannel enters IDLE, even if previously was connected.
|
||||
|
|
|
@ -460,6 +460,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
|
||||
resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2));
|
||||
assertThat(resolver.refreshCount).isEqualTo(0);
|
||||
verify(helper).ignoreRefreshNameResolutionCheck();
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
childBalancer.helper.refreshNameResolution();
|
||||
assertThat(resolver.refreshCount).isEqualTo(1);
|
||||
|
@ -519,6 +520,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
assertAddressesEqual(Collections.singletonList(endpoint), childBalancer.addresses);
|
||||
assertThat(resolver.refreshCount).isEqualTo(0);
|
||||
verify(helper).ignoreRefreshNameResolutionCheck();
|
||||
|
||||
childBalancer.helper.refreshNameResolution();
|
||||
assertThat(resolver.refreshCount).isEqualTo(1);
|
||||
|
|
|
@ -233,12 +233,14 @@ public class RingHashLoadBalancerTest {
|
|||
subchannels.get(Collections.singletonList(servers.get(0))),
|
||||
ConnectivityStateInfo.forTransientFailure(
|
||||
Status.UNKNOWN.withDescription("unknown failure")));
|
||||
inOrder.verify(helper).refreshNameResolution();
|
||||
inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class));
|
||||
|
||||
// one in TRANSIENT_FAILURE, one in IDLE
|
||||
deliverSubchannelState(
|
||||
subchannels.get(Collections.singletonList(servers.get(1))),
|
||||
ConnectivityStateInfo.forNonError(IDLE));
|
||||
inOrder.verify(helper).refreshNameResolution();
|
||||
inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
|
||||
verifyNoMoreInteractions(helper);
|
||||
|
@ -260,6 +262,7 @@ public class RingHashLoadBalancerTest {
|
|||
subchannels.get(Collections.singletonList(servers.get(0))),
|
||||
ConnectivityStateInfo.forTransientFailure(
|
||||
Status.UNAVAILABLE.withDescription("not found")));
|
||||
inOrder.verify(helper).refreshNameResolution();
|
||||
inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
|
||||
// two in TRANSIENT_FAILURE, two in IDLE
|
||||
|
@ -267,6 +270,7 @@ public class RingHashLoadBalancerTest {
|
|||
subchannels.get(Collections.singletonList(servers.get(1))),
|
||||
ConnectivityStateInfo.forTransientFailure(
|
||||
Status.UNAVAILABLE.withDescription("also not found")));
|
||||
inOrder.verify(helper).refreshNameResolution();
|
||||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
|
||||
|
||||
|
@ -283,6 +287,7 @@ public class RingHashLoadBalancerTest {
|
|||
subchannels.get(Collections.singletonList(servers.get(3))),
|
||||
ConnectivityStateInfo.forTransientFailure(
|
||||
Status.UNAVAILABLE.withDescription("connection lost")));
|
||||
inOrder.verify(helper).refreshNameResolution();
|
||||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
|
||||
|
||||
|
@ -311,6 +316,7 @@ public class RingHashLoadBalancerTest {
|
|||
deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(
|
||||
Status.UNAUTHENTICATED.withDescription("Permission denied")));
|
||||
}
|
||||
verify(helper, times(3)).refreshNameResolution();
|
||||
|
||||
// Stays in IDLE when until there are two or more subchannels in TRANSIENT_FAILURE.
|
||||
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
|
|
Loading…
Reference in New Issue