Introduce onResult2 in NameResolver Listener2 that returns Status (#11313)

Introducing NameResolver listener method "Status Listener2::onResult2(ResolutionResult)" that returns Status of the acceptance of the name resolution by the load balancer, and the Name Resolver will call this method for both success and error cases.
This commit is contained in:
Kannan J 2024-07-26 15:43:36 +05:30 committed by GitHub
parent 786523dca4
commit 9ba2f9dec5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 506 additions and 173 deletions

View File

@ -246,6 +246,16 @@ public abstract class NameResolver {
*/
@Override
public abstract void onError(Status error);
/**
* Handles updates on resolved addresses and attributes.
*
* @param resolutionResult the resolved server addresses, attributes, and Service Config.
* @since 1.66
*/
public Status onResult2(ResolutionResult resolutionResult) {
throw new UnsupportedOperationException("Not implemented.");
}
}
/**

View File

@ -330,7 +330,9 @@ public class DnsNameResolver extends NameResolver {
resolutionResultBuilder.setAttributes(result.attributes);
}
}
savedListener.onResult(resolutionResultBuilder.build());
syncContext.execute(() -> {
savedListener.onResult2(resolutionResultBuilder.build());
});
} catch (IOException e) {
savedListener.onError(
Status.UNAVAILABLE.withDescription("Unable to resolve host " + host).withCause(e));

View File

@ -1673,148 +1673,149 @@ final class ManagedChannelImpl extends ManagedChannel implements
public void onResult(final ResolutionResult resolutionResult) {
final class NamesResolved implements Runnable {
@SuppressWarnings("ReferenceEquality")
@Override
public void run() {
if (ManagedChannelImpl.this.nameResolver != resolver) {
return;
}
List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
channelLogger.log(
ChannelLogLevel.DEBUG,
"Resolved address: {0}, config={1}",
servers,
resolutionResult.getAttributes());
if (lastResolutionState != ResolutionState.SUCCESS) {
channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
lastResolutionState = ResolutionState.SUCCESS;
}
ConfigOrError configOrError = resolutionResult.getServiceConfig();
Status status = onResult2(resolutionResult);
ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
.get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
InternalConfigSelector resolvedConfigSelector =
resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
ManagedChannelServiceConfig validServiceConfig =
configOrError != null && configOrError.getConfig() != null
? (ManagedChannelServiceConfig) configOrError.getConfig()
: null;
Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
ManagedChannelServiceConfig effectiveServiceConfig;
if (!lookUpServiceConfig) {
if (validServiceConfig != null) {
channelLogger.log(
ChannelLogLevel.INFO,
"Service config from name resolver discarded by channel settings");
}
effectiveServiceConfig =
defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
if (resolvedConfigSelector != null) {
channelLogger.log(
ChannelLogLevel.INFO,
"Config selector from name resolver discarded by channel settings");
}
realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
} else {
// Try to use config if returned from name resolver
// Otherwise, try to use the default config if available
if (validServiceConfig != null) {
effectiveServiceConfig = validServiceConfig;
if (resolvedConfigSelector != null) {
realChannel.updateConfigSelector(resolvedConfigSelector);
if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
channelLogger.log(
ChannelLogLevel.DEBUG,
"Method configs in service config will be discarded due to presence of"
+ "config-selector");
}
} else {
realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
}
} else if (defaultServiceConfig != null) {
effectiveServiceConfig = defaultServiceConfig;
realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
channelLogger.log(
ChannelLogLevel.INFO,
"Received no service config, using default service config");
} else if (serviceConfigError != null) {
if (!serviceConfigUpdated) {
// First DNS lookup has invalid service config, and cannot fall back to default
channelLogger.log(
ChannelLogLevel.INFO,
"Fallback to error due to invalid first service config without default config");
// This error could be an "inappropriate" control plane error that should not bleed
// through to client code using gRPC. We let them flow through here to the LB as
// we later check for these error codes when investigating pick results in
// GrpcUtil.getTransportFromPickResult().
onError(configOrError.getError());
if (resolutionResultListener != null) {
resolutionResultListener.resolutionAttempted(configOrError.getError());
}
return;
} else {
effectiveServiceConfig = lastServiceConfig;
}
} else {
effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
realChannel.updateConfigSelector(null);
}
if (!effectiveServiceConfig.equals(lastServiceConfig)) {
channelLogger.log(
ChannelLogLevel.INFO,
"Service config changed{0}",
effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
lastServiceConfig = effectiveServiceConfig;
transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
}
try {
// TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
// and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
// lbNeedAddress is not deterministic
serviceConfigUpdated = true;
} catch (RuntimeException re) {
logger.log(
Level.WARNING,
"[" + getLogId() + "] Unexpected exception from parsing service config",
re);
}
}
Attributes effectiveAttrs = resolutionResult.getAttributes();
// Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
Attributes.Builder attrBuilder =
effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
Map<String, ?> healthCheckingConfig =
effectiveServiceConfig.getHealthCheckingConfig();
if (healthCheckingConfig != null) {
attrBuilder
.set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
.build();
}
Attributes attributes = attrBuilder.build();
Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setAttributes(attributes)
.setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
.build());
// If a listener is provided, let it know if the addresses were accepted.
if (resolutionResultListener != null) {
resolutionResultListener.resolutionAttempted(addressAcceptanceStatus);
}
}
resolutionResultListener.resolutionAttempted(status);
}
}
syncContext.execute(new NamesResolved());
}
@SuppressWarnings("ReferenceEquality")
@Override
public Status onResult2(final ResolutionResult resolutionResult) {
syncContext.throwIfNotInThisSynchronizationContext();
if (ManagedChannelImpl.this.nameResolver != resolver) {
return Status.OK;
}
List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
channelLogger.log(
ChannelLogLevel.DEBUG,
"Resolved address: {0}, config={1}",
servers,
resolutionResult.getAttributes());
if (lastResolutionState != ResolutionState.SUCCESS) {
channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
lastResolutionState = ResolutionState.SUCCESS;
}
ConfigOrError configOrError = resolutionResult.getServiceConfig();
InternalConfigSelector resolvedConfigSelector =
resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
ManagedChannelServiceConfig validServiceConfig =
configOrError != null && configOrError.getConfig() != null
? (ManagedChannelServiceConfig) configOrError.getConfig()
: null;
Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
ManagedChannelServiceConfig effectiveServiceConfig;
if (!lookUpServiceConfig) {
if (validServiceConfig != null) {
channelLogger.log(
ChannelLogLevel.INFO,
"Service config from name resolver discarded by channel settings");
}
effectiveServiceConfig =
defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
if (resolvedConfigSelector != null) {
channelLogger.log(
ChannelLogLevel.INFO,
"Config selector from name resolver discarded by channel settings");
}
realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
} else {
// Try to use config if returned from name resolver
// Otherwise, try to use the default config if available
if (validServiceConfig != null) {
effectiveServiceConfig = validServiceConfig;
if (resolvedConfigSelector != null) {
realChannel.updateConfigSelector(resolvedConfigSelector);
if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
channelLogger.log(
ChannelLogLevel.DEBUG,
"Method configs in service config will be discarded due to presence of"
+ "config-selector");
}
} else {
realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
}
} else if (defaultServiceConfig != null) {
effectiveServiceConfig = defaultServiceConfig;
realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
channelLogger.log(
ChannelLogLevel.INFO,
"Received no service config, using default service config");
} else if (serviceConfigError != null) {
if (!serviceConfigUpdated) {
// First DNS lookup has invalid service config, and cannot fall back to default
channelLogger.log(
ChannelLogLevel.INFO,
"Fallback to error due to invalid first service config without default config");
// This error could be an "inappropriate" control plane error that should not bleed
// through to client code using gRPC. We let them flow through here to the LB as
// we later check for these error codes when investigating pick results in
// GrpcUtil.getTransportFromPickResult().
onError(configOrError.getError());
return configOrError.getError();
} else {
effectiveServiceConfig = lastServiceConfig;
}
} else {
effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
realChannel.updateConfigSelector(null);
}
if (!effectiveServiceConfig.equals(lastServiceConfig)) {
channelLogger.log(
ChannelLogLevel.INFO,
"Service config changed{0}",
effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
lastServiceConfig = effectiveServiceConfig;
transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
}
try {
// TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
// and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
// lbNeedAddress is not deterministic
serviceConfigUpdated = true;
} catch (RuntimeException re) {
logger.log(
Level.WARNING,
"[" + getLogId() + "] Unexpected exception from parsing service config",
re);
}
}
Attributes effectiveAttrs = resolutionResult.getAttributes();
// Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
Attributes.Builder attrBuilder =
effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
Map<String, ?> healthCheckingConfig =
effectiveServiceConfig.getHealthCheckingConfig();
if (healthCheckingConfig != null) {
attrBuilder
.set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
.build();
}
Attributes attributes = attrBuilder.build();
return helper.lb.tryAcceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setAttributes(attributes)
.setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
.build());
}
return Status.OK;
}
@Override
public void onError(final Status error) {
checkArgument(!error.isOk(), "the error status must not be OK");

View File

@ -95,12 +95,24 @@ final class RetryingNameResolver extends ForwardingNameResolver {
"RetryingNameResolver can only be used once to wrap a NameResolver");
}
// To have retry behavior for name resolvers that haven't migrated to onResult2.
delegateListener.onResult(resolutionResult.toBuilder().setAttributes(
resolutionResult.getAttributes().toBuilder()
.set(RESOLUTION_RESULT_LISTENER_KEY, new ResolutionResultListener()).build())
.build());
}
@Override
public Status onResult2(ResolutionResult resolutionResult) {
Status status = delegateListener.onResult2(resolutionResult);
if (status.isOk()) {
retryScheduler.reset();
} else {
retryScheduler.schedule(new DelayedNameResolverRefresh());
}
return status;
}
@Override
public void onError(Status error) {
delegateListener.onError(error);

View File

@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@ -226,13 +225,7 @@ public class DnsNameResolverTest {
System.getProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY);
// By default the mock listener processes the result successfully.
doAnswer(invocation -> {
ResolutionResult result = invocation.getArgument(0);
syncContext.execute(
() -> result.getAttributes().get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY)
.resolutionAttempted(Status.OK));
return null;
}).when(mockListener).onResult(isA(ResolutionResult.class));
when(mockListener.onResult2(isA(ResolutionResult.class))).thenReturn(Status.OK);
}
@After
@ -319,13 +312,13 @@ public class DnsNameResolverTest {
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
assertAnswerMatches(answer1, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
resolver.refresh();
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener, times(2)).onResult(resultCaptor.capture());
verify(mockListener, times(2)).onResult2(resultCaptor.capture());
assertAnswerMatches(answer2, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
assertEquals(0, fakeExecutor.numPendingTasks());
@ -347,7 +340,7 @@ public class DnsNameResolverTest {
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
assertAnswerMatches(answer, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
assertEquals(0, fakeExecutor.numPendingTasks());
@ -389,7 +382,7 @@ public class DnsNameResolverTest {
resolver.start(mockListener);
assertEquals(0, fakeExecutor.runDueTasks());
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
assertAnswerMatches(answer, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
assertEquals(0, fakeExecutor.numPendingTasks());
@ -418,7 +411,7 @@ public class DnsNameResolverTest {
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
assertAnswerMatches(answer1, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
@ -452,7 +445,7 @@ public class DnsNameResolverTest {
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
assertAnswerMatches(answer, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
@ -487,14 +480,14 @@ public class DnsNameResolverTest {
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
assertAnswerMatches(answer1, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
fakeTicker.advance(ttl + 1, TimeUnit.SECONDS);
resolver.refresh();
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener, times(2)).onResult(resultCaptor.capture());
verify(mockListener, times(2)).onResult2(resultCaptor.capture());
assertAnswerMatches(answer2, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
assertEquals(0, fakeExecutor.numPendingTasks());
@ -531,7 +524,7 @@ public class DnsNameResolverTest {
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
assertAnswerMatches(answer1, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
@ -544,7 +537,7 @@ public class DnsNameResolverTest {
fakeTicker.advance(1, TimeUnit.SECONDS);
resolver.refresh();
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener, times(2)).onResult(resultCaptor.capture());
verify(mockListener, times(2)).onResult2(resultCaptor.capture());
assertAnswerMatches(answer2, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
assertEquals(0, fakeExecutor.numPendingTasks());
@ -575,7 +568,7 @@ public class DnsNameResolverTest {
assertThat(fakeExecutor.runDueTasks()).isEqualTo(1);
ArgumentCaptor<ResolutionResult> ac = ArgumentCaptor.forClass(ResolutionResult.class);
verify(mockListener).onResult(ac.capture());
verify(mockListener).onResult2(ac.capture());
verifyNoMoreInteractions(mockListener);
assertThat(ac.getValue().getAddresses()).isEmpty();
assertThat(ac.getValue().getServiceConfig()).isNull();
@ -588,12 +581,7 @@ public class DnsNameResolverTest {
// Load balancer rejects the empty addresses.
@Test
public void resolve_emptyResult_notAccepted() throws Exception {
doAnswer(invocation -> {
ResolutionResult result = invocation.getArgument(0);
result.getAttributes().get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY)
.resolutionAttempted(Status.UNAVAILABLE);
return null;
}).when(mockListener).onResult(isA(ResolutionResult.class));
when(mockListener.onResult2(isA(ResolutionResult.class))).thenReturn(Status.UNAVAILABLE);
DnsNameResolver.enableTxt = true;
RetryingNameResolver resolver = newResolver("dns:///addr.fake:1234", 443);
@ -614,7 +602,7 @@ public class DnsNameResolverTest {
syncContext.execute(() -> assertThat(fakeExecutor.runDueTasks()).isEqualTo(1));
ArgumentCaptor<ResolutionResult> ac = ArgumentCaptor.forClass(ResolutionResult.class);
verify(mockListener).onResult(ac.capture());
verify(mockListener).onResult2(ac.capture());
verifyNoMoreInteractions(mockListener);
assertThat(ac.getValue().getAddresses()).isEmpty();
assertThat(ac.getValue().getServiceConfig()).isNull();
@ -640,7 +628,7 @@ public class DnsNameResolverTest {
dnsResolver.setResourceResolver(null);
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
ResolutionResult result = resultCaptor.getValue();
InetSocketAddress resolvedBackendAddr =
(InetSocketAddress) Iterables.getOnlyElement(
@ -712,7 +700,7 @@ public class DnsNameResolverTest {
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
ResolutionResult result = resultCaptor.getValue();
InetSocketAddress resolvedBackendAddr =
(InetSocketAddress) Iterables.getOnlyElement(
@ -770,7 +758,7 @@ public class DnsNameResolverTest {
dnsResolver.setResourceResolver(mockResourceResolver);
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
ResolutionResult result = resultCaptor.getValue();
InetSocketAddress resolvedBackendAddr =
(InetSocketAddress) Iterables.getOnlyElement(
@ -802,7 +790,7 @@ public class DnsNameResolverTest {
dnsResolver.setResourceResolver(mockResourceResolver);
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
ResolutionResult result = resultCaptor.getValue();
InetSocketAddress resolvedBackendAddr =
(InetSocketAddress) Iterables.getOnlyElement(
@ -870,7 +858,7 @@ public class DnsNameResolverTest {
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
List<EquivalentAddressGroup> result = resultCaptor.getValue().getAddresses();
assertThat(result).hasSize(1);
EquivalentAddressGroup eag = result.get(0);

View File

@ -1054,6 +1054,79 @@ public class ManagedChannelImplTest {
verifyNoMoreInteractions(mockLoadBalancer);
}
@Test
public void noMoreCallbackAfterLoadBalancerShutdown_configError() throws InterruptedException {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
Status resolutionError = Status.UNAVAILABLE.withDescription("Resolution failed");
createChannel();
FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
verify(mockLoadBalancer).acceptResolvedAddresses(resolvedAddressCaptor.capture());
assertThat(resolvedAddressCaptor.getValue().getAddresses()).containsExactly(addressGroup);
SubchannelStateListener stateListener1 = mock(SubchannelStateListener.class);
SubchannelStateListener stateListener2 = mock(SubchannelStateListener.class);
Subchannel subchannel1 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, stateListener1);
Subchannel subchannel2 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, stateListener2);
requestConnectionSafely(helper, subchannel1);
requestConnectionSafely(helper, subchannel2);
verify(mockTransportFactory, times(2))
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
MockClientTransportInfo transportInfo1 = transports.poll();
MockClientTransportInfo transportInfo2 = transports.poll();
// LoadBalancer receives all sorts of callbacks
transportInfo1.listener.transportReady();
verify(stateListener1, times(2)).onSubchannelState(stateInfoCaptor.capture());
assertSame(CONNECTING, stateInfoCaptor.getAllValues().get(0).getState());
assertSame(READY, stateInfoCaptor.getAllValues().get(1).getState());
verify(stateListener2).onSubchannelState(stateInfoCaptor.capture());
assertSame(CONNECTING, stateInfoCaptor.getValue().getState());
resolver.listener.onError(resolutionError);
verify(mockLoadBalancer).handleNameResolutionError(resolutionError);
verifyNoMoreInteractions(mockLoadBalancer);
channel.shutdown();
verify(mockLoadBalancer).shutdown();
verifyNoMoreInteractions(stateListener1, stateListener2);
// LoadBalancer will normally shutdown all subchannels
shutdownSafely(helper, subchannel1);
shutdownSafely(helper, subchannel2);
// Since subchannels are shutdown, SubchannelStateListeners will only get SHUTDOWN regardless of
// the transport states.
transportInfo1.listener.transportShutdown(Status.UNAVAILABLE);
transportInfo2.listener.transportReady();
verify(stateListener1).onSubchannelState(ConnectivityStateInfo.forNonError(SHUTDOWN));
verify(stateListener2).onSubchannelState(ConnectivityStateInfo.forNonError(SHUTDOWN));
verifyNoMoreInteractions(stateListener1, stateListener2);
// No more callback should be delivered to LoadBalancer after it's shut down
resolver.listener.onResult(
ResolutionResult.newBuilder()
.setAddresses(new ArrayList<>())
.setServiceConfig(
ConfigOrError.fromError(Status.UNAVAILABLE.withDescription("Resolution failed")))
.build());
Thread.sleep(1100);
assertThat(timer.getPendingTasks()).isEmpty();
resolver.resolved();
verifyNoMoreInteractions(mockLoadBalancer);
}
@Test
public void interceptor() throws Exception {
final AtomicLong atomic = new AtomicLong();
@ -3138,6 +3211,48 @@ public class ManagedChannelImplTest {
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
}
@Test
public void channelTracing_nameResolvedEvent_zeorAndNonzeroBackends_usesListener2onResult2()
throws Exception {
timer.forwardNanos(1234);
channelBuilder.maxTraceEvents(10);
List<EquivalentAddressGroup> servers = new ArrayList<>();
servers.add(new EquivalentAddressGroup(socketAddress));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
int prevSize = getStats(channel).channelTrace.events.size();
ResolutionResult resolutionResult1 = ResolutionResult.newBuilder()
.setAddresses(Collections.singletonList(
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.build();
channel.syncContext.execute(
() -> nameResolverFactory.resolvers.get(0).listener.onResult2(resolutionResult1));
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
prevSize = getStats(channel).channelTrace.events.size();
nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
prevSize = getStats(channel).channelTrace.events.size();
nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
prevSize = getStats(channel).channelTrace.events.size();
ResolutionResult resolutionResult2 = ResolutionResult.newBuilder()
.setAddresses(Collections.singletonList(
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.build();
channel.syncContext.execute(
() -> nameResolverFactory.resolvers.get(0).listener.onResult2(resolutionResult2));
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
}
@Test
public void channelTracing_serviceConfigChange() throws Exception {
timer.forwardNanos(1234);
@ -3197,6 +3312,69 @@ public class ManagedChannelImplTest {
.build());
}
@Test
public void channelTracing_serviceConfigChange_usesListener2OnResult2() throws Exception {
timer.forwardNanos(1234);
channelBuilder.maxTraceEvents(10);
List<EquivalentAddressGroup> servers = new ArrayList<>();
servers.add(new EquivalentAddressGroup(socketAddress));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
int prevSize = getStats(channel).channelTrace.events.size();
ManagedChannelServiceConfig mcsc1 = createManagedChannelServiceConfig(
ImmutableMap.<String, Object>of(),
new PolicySelection(
mockLoadBalancerProvider, null));
ResolutionResult resolutionResult1 = ResolutionResult.newBuilder()
.setAddresses(Collections.singletonList(
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.setServiceConfig(ConfigOrError.fromConfig(mcsc1))
.build();
channel.syncContext.execute(() ->
nameResolverFactory.resolvers.get(0).listener.onResult2(resolutionResult1));
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
assertThat(getStats(channel).channelTrace.events.get(prevSize))
.isEqualTo(new ChannelTrace.Event.Builder()
.setDescription("Service config changed")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
prevSize = getStats(channel).channelTrace.events.size();
ResolutionResult resolutionResult2 = ResolutionResult.newBuilder().setAddresses(
Collections.singletonList(
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.setServiceConfig(ConfigOrError.fromConfig(mcsc1))
.build();
channel.syncContext.execute(() ->
nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult2));
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
prevSize = getStats(channel).channelTrace.events.size();
timer.forwardNanos(1234);
ResolutionResult resolutionResult3 = ResolutionResult.newBuilder()
.setAddresses(Collections.singletonList(
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.setServiceConfig(ConfigOrError.fromConfig(ManagedChannelServiceConfig.empty()))
.build();
channel.syncContext.execute(() ->
nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult3));
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
assertThat(getStats(channel).channelTrace.events.get(prevSize))
.isEqualTo(new ChannelTrace.Event.Builder()
.setDescription("Service config changed")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test
public void channelTracing_stateChangeEvent() throws Exception {
channelBuilder.maxTraceEvents(10);
@ -3857,6 +4035,120 @@ public class ManagedChannelImplTest {
mychannel.shutdownNow();
}
@Test
public void badServiceConfigIsRecoverable_usesListener2OnResult2() throws Exception {
final List<EquivalentAddressGroup> addresses =
ImmutableList.of(new EquivalentAddressGroup(new SocketAddress() {}));
final class FakeNameResolver extends NameResolver {
Listener2 listener;
private final SynchronizationContext syncContext;
FakeNameResolver(Args args) {
this.syncContext = args.getSynchronizationContext();
}
@Override
public String getServiceAuthority() {
return "also fake";
}
@Override
public void start(Listener2 listener) {
this.listener = listener;
syncContext.execute(() ->
listener.onResult2(
ResolutionResult.newBuilder()
.setAddresses(addresses)
.setServiceConfig(
ConfigOrError.fromError(
Status.INTERNAL.withDescription("kaboom is invalid")))
.build()));
}
@Override
public void shutdown() {}
}
final class FakeNameResolverFactory2 extends NameResolver.Factory {
FakeNameResolver resolver;
ManagedChannelImpl managedChannel;
SynchronizationContext syncContext;
@Nullable
@Override
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
syncContext = args.getSynchronizationContext();
return (resolver = new FakeNameResolver(args));
}
@Override
public String getDefaultScheme() {
return "fake";
}
}
FakeNameResolverFactory2 factory = new FakeNameResolverFactory2();
ManagedChannelImplBuilder customBuilder = new ManagedChannelImplBuilder(TARGET,
new ClientTransportFactoryBuilder() {
@Override
public ClientTransportFactory buildClientTransportFactory() {
return mockTransportFactory;
}
},
null);
when(mockTransportFactory.getSupportedSocketAddressTypes()).thenReturn(Collections.singleton(
InetSocketAddress.class));
customBuilder.executorPool = executorPool;
customBuilder.channelz = channelz;
ManagedChannel mychannel = customBuilder.nameResolverFactory(factory).build();
ClientCall<Void, Void> call1 =
mychannel.newCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT);
ListenableFuture<Void> future1 = ClientCalls.futureUnaryCall(call1, null);
executor.runDueTasks();
try {
future1.get(1, TimeUnit.SECONDS);
Assert.fail();
} catch (ExecutionException e) {
assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("kaboom");
}
// ok the service config is bad, let's fix it.
Map<String, Object> rawServiceConfig =
parseConfig("{\"loadBalancingConfig\": [{\"round_robin\": {}}]}");
Object fakeLbConfig = new Object();
PolicySelection lbConfigs =
new PolicySelection(
mockLoadBalancerProvider, fakeLbConfig);
mockLoadBalancerProvider.parseLoadBalancingPolicyConfig(rawServiceConfig);
ManagedChannelServiceConfig managedChannelServiceConfig =
createManagedChannelServiceConfig(rawServiceConfig, lbConfigs);
factory.syncContext.execute(() ->
factory.resolver.listener.onResult2(
ResolutionResult.newBuilder()
.setAddresses(addresses)
.setServiceConfig(ConfigOrError.fromConfig(managedChannelServiceConfig))
.build()));
ClientCall<Void, Void> call2 = mychannel.newCall(
TestMethodDescriptors.voidMethod(),
CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS));
ListenableFuture<Void> future2 = ClientCalls.futureUnaryCall(call2, null);
timer.forwardTime(1234, TimeUnit.SECONDS);
executor.runDueTasks();
try {
future2.get();
Assert.fail();
} catch (ExecutionException e) {
assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("deadline");
}
mychannel.shutdownNow();
}
@Test
public void nameResolverArgsPropagation() {
final AtomicReference<NameResolver.Args> capturedArgs = new AtomicReference<>();
@ -4518,7 +4810,7 @@ public class ManagedChannelImplTest {
}
assertEquals(DEFAULT_PORT, args.getDefaultPort());
FakeNameResolverFactory.FakeNameResolver resolver =
new FakeNameResolverFactory.FakeNameResolver(targetUri, error);
new FakeNameResolverFactory.FakeNameResolver(targetUri, error, args);
resolvers.add(resolver);
return resolver;
}
@ -4546,14 +4838,16 @@ public class ManagedChannelImplTest {
final class FakeNameResolver extends NameResolver {
final URI targetUri;
final SynchronizationContext syncContext;
Listener2 listener;
boolean shutdown;
int refreshCalled;
Status error;
FakeNameResolver(URI targetUri, Status error) {
FakeNameResolver(URI targetUri, Status error, Args args) {
this.targetUri = targetUri;
this.error = error;
syncContext = args.getSynchronizationContext();
}
@Override public String getServiceAuthority() {
@ -4585,7 +4879,7 @@ public class ManagedChannelImplTest {
if (configOrError != null) {
builder.setServiceConfig(configOrError);
}
listener.onResult(builder.build());
syncContext.execute(() -> listener.onResult(builder.build()));
}
@Override public void shutdown() {

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.grpc.NameResolver;
import io.grpc.NameResolver.Listener2;
@ -79,7 +80,7 @@ public class RetryingNameResolverTest {
// Make sure the ResolutionResultListener callback is added to the ResolutionResult attributes,
// and the retry scheduler is reset since the name resolution was successful.
@Test
public void onResult_sucess() {
public void onResult_success() {
retryingNameResolver.start(mockListener);
verify(mockNameResolver).start(listenerCaptor.capture());
@ -94,6 +95,18 @@ public class RetryingNameResolverTest {
verify(mockRetryScheduler).reset();
}
@Test
public void onResult2_sucesss() {
when(mockListener.onResult2(isA(ResolutionResult.class))).thenReturn(Status.OK);
retryingNameResolver.start(mockListener);
verify(mockNameResolver).start(listenerCaptor.capture());
assertThat(listenerCaptor.getValue().onResult2(ResolutionResult.newBuilder().build()))
.isEqualTo(Status.OK);
verify(mockRetryScheduler).reset();
}
// Make sure the ResolutionResultListener callback is added to the ResolutionResult attributes,
// and that a retry gets scheduled when the resolution results are rejected.
@Test
@ -112,6 +125,19 @@ public class RetryingNameResolverTest {
verify(mockRetryScheduler).schedule(isA(Runnable.class));
}
// Make sure that a retry gets scheduled when the resolution results are rejected.
@Test
public void onResult2_failure() {
when(mockListener.onResult2(isA(ResolutionResult.class))).thenReturn(Status.UNAVAILABLE);
retryingNameResolver.start(mockListener);
verify(mockNameResolver).start(listenerCaptor.capture());
assertThat(listenerCaptor.getValue().onResult2(ResolutionResult.newBuilder().build()))
.isEqualTo(Status.UNAVAILABLE);
verify(mockRetryScheduler).schedule(isA(Runnable.class));
}
// Wrapping a NameResolver more than once is a misconfiguration.
@Test
public void onResult_failure_doubleWrapped() {

View File

@ -152,7 +152,7 @@ public class GrpclbNameResolverTest {
resolver.start(mockListener);
assertThat(fakeClock.runDueTasks()).isEqualTo(1);
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
ResolutionResult result = resultCaptor.getValue();
assertThat(result.getAddresses()).isEmpty();
assertThat(result.getAttributes()).isEqualTo(Attributes.EMPTY);
@ -192,7 +192,7 @@ public class GrpclbNameResolverTest {
resolver.start(mockListener);
assertThat(fakeClock.runDueTasks()).isEqualTo(1);
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
ResolutionResult result = resultCaptor.getValue();
InetSocketAddress resolvedBackendAddr =
(InetSocketAddress) Iterables.getOnlyElement(
@ -225,7 +225,7 @@ public class GrpclbNameResolverTest {
resolver.start(mockListener);
assertThat(fakeClock.runDueTasks()).isEqualTo(1);
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
ResolutionResult result = resultCaptor.getValue();
assertThat(result.getAddresses())
.containsExactly(
@ -272,7 +272,7 @@ public class GrpclbNameResolverTest {
resolver.start(mockListener);
assertThat(fakeClock.runDueTasks()).isEqualTo(1);
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
ResolutionResult result = resultCaptor.getValue();
assertThat(result.getAddresses()).isEmpty();
EquivalentAddressGroup resolvedBalancerAddr =
@ -306,7 +306,7 @@ public class GrpclbNameResolverTest {
resolver.start(mockListener);
assertThat(fakeClock.runDueTasks()).isEqualTo(1);
verify(mockListener).onResult(resultCaptor.capture());
verify(mockListener).onResult2(resultCaptor.capture());
ResolutionResult result = resultCaptor.getValue();
InetSocketAddress resolvedBackendAddr =