core: deprecate NR.L and add NR.Results

This commit is contained in:
Carl Mastrangelo 2019-03-22 16:38:28 -07:00 committed by GitHub
parent 1eb6fc523e
commit f4a31ec62d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 384 additions and 115 deletions

View File

@ -20,10 +20,13 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@ -43,9 +46,9 @@ import javax.annotation.concurrent.ThreadSafe;
* {@link #refresh()}.
*
* <p>Implementations <strong>don't need to be thread-safe</strong>. All methods are guaranteed to
* be called sequentially. Additionally, all methods that have side-effects, i.e., {@link #start},
* {@link #shutdown} and {@link #refresh} are called from the same {@link SynchronizationContext} as
* returned by {@link Helper#getSynchronizationContext}.
* be called sequentially. Additionally, all methods that have side-effects, i.e.,
* {@link #start(Observer)}, {@link #shutdown} and {@link #refresh} are called from the same
* {@link SynchronizationContext} as returned by {@link Helper#getSynchronizationContext}.
*
* @since 1.0.0
*/
@ -68,9 +71,23 @@ public abstract class NameResolver {
* Starts the resolution.
*
* @param listener used to receive updates on the target
* @deprecated override {@link #start(Observer)} instead.
* @since 1.0.0
*/
public abstract void start(Listener listener);
@Deprecated
public void start(Listener listener) {
throw new UnsupportedOperationException("Not implemented");
}
/**
* Starts the resolution. This method will become abstract in 1.21.0.
*
* @param observer used to receive updates on the target
* @since 1.20.0
*/
public void start(Observer observer) {
start((Listener) observer);
}
/**
* Stops the resolution. Updates to the Listener will stop.
@ -180,10 +197,12 @@ public abstract class NameResolver {
*
* <p>All methods are expected to return quickly.
*
* @deprecated use {@link Observer} instead.
* @since 1.0.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1770")
@ThreadSafe
@Deprecated
public interface Listener {
/**
* Handles updates on resolved addresses and attributes.
@ -207,6 +226,44 @@ public abstract class NameResolver {
void onError(Status error);
}
/**
* Receives address updates.
*
* <p>All methods are expected to return quickly.
*
* @since 1.20.0
*/
public abstract static class Observer implements Listener {
/**
* @deprecated This will be removed in 1.21.0
*/
@Override
@Deprecated
public final void onAddresses(
List<EquivalentAddressGroup> servers, @ResolutionResultAttr Attributes attributes) {
onResult(ResolutionResult.newBuilder().setServers(servers).setAttributes(attributes).build());
}
/**
* Handles updates on resolved addresses and attributes. If
* {@link ResolutionResult#getServers()} is empty, {@link #onError(Status)} will be called.
*
* @param resolutionResult the resolved server addresses, attributes, and Service Config.
* @since 1.20.0
*/
public abstract void onResult(ResolutionResult resolutionResult);
/**
* Handles an error from the resolver. The observer is responsible for eventually invoking
* {@link NameResolver#refresh()} to re-attempt resolution.
*
* @param error a non-OK status
* @since 1.20.0
*/
@Override
public abstract void onError(Status error);
}
/**
* Annotation for name resolution result attributes. It follows the annotation semantics defined
* by {@link Attributes}.
@ -239,8 +296,8 @@ public abstract class NameResolver {
public abstract ProxyDetector getProxyDetector();
/**
* Returns the {@link SynchronizationContext} where {@link #start}, {@link #shutdown} and {@link
* #refresh} are run from.
* Returns the {@link SynchronizationContext} where {@link #start(Observer)}, {@link #shutdown}
* and {@link #refresh} are run from.
*
* @since 1.20.0
*/
@ -252,7 +309,8 @@ public abstract class NameResolver {
* Parses and validates the service configuration chosen by the name resolver. This will
* return a {@link ConfigOrError} which contains either the successfully parsed config, or the
* {@link Status} representing the failure to parse. Implementations are expected to not throw
* exceptions but return a Status representing the failure.
* exceptions but return a Status representing the failure. The value inside the
* {@link ConfigOrError} should implement {@link Object#equals()} and {@link Object#hashCode()}.
*
* @param rawServiceConfig The {@link Map} representation of the service config
* @return a tuple of the fully parsed and validated channel configuration, else the Status.
@ -350,4 +408,143 @@ public abstract class NameResolver {
}
}
}
/**
* Represents the results from a Name Resolver.
*
* @since 1.20.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1770")
public static final class ResolutionResult {
private final List<EquivalentAddressGroup> servers;
@ResolutionResultAttr
private final Attributes attributes;
@Nullable
private final Object serviceConfig;
ResolutionResult(
List<EquivalentAddressGroup> servers,
@ResolutionResultAttr Attributes attributes,
Object serviceConfig) {
this.servers = Collections.unmodifiableList(new ArrayList<>(servers));
this.attributes = checkNotNull(attributes, "attributes");
this.serviceConfig = serviceConfig;
}
/**
* Constructs a new builder of a name resolution result.
*
* @since 1.20.0
*/
public static Builder newBuilder() {
return new Builder();
}
/**
* Gets the servers resolved by name resolution.
*
* @since 1.20.0
*/
public List<EquivalentAddressGroup> getServers() {
return servers;
}
/**
* Gets the attributes associated with the servers resolved by name resolution.
*
* @since 1.20.0
*/
@ResolutionResultAttr
public Attributes getAttributes() {
return attributes;
}
/**
* Gets the Service Config parsed by {@link NameResolver.Helper#parseServiceConfig(Map)}.
*
* @since 1.20.0
*/
@Nullable
public Object getServiceConfig() {
return serviceConfig;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("servers", servers)
.add("attributes", attributes)
.add("serviceConfig", serviceConfig)
.toString();
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof ResolutionResult)) {
return false;
}
ResolutionResult that = (ResolutionResult) obj;
return Objects.equal(this.servers, that.servers)
&& Objects.equal(this.attributes, that.attributes)
&& Objects.equal(this.serviceConfig, that.serviceConfig);
}
@Override
public int hashCode() {
return Objects.hashCode(servers, attributes, serviceConfig);
}
/**
* A builder for {@link ResolutionResult}.
*
* @since 1.20.0
*/
public static final class Builder {
private List<EquivalentAddressGroup> servers = Collections.emptyList();
private Attributes attributes = Attributes.EMPTY;
@Nullable
private Object serviceConfig;
Builder() {}
/**
* Sets the servers resolved by name resolution.
*
* @since 1.20.0
*/
public Builder setServers(List<EquivalentAddressGroup> servers) {
this.servers = servers;
return this;
}
/**
* Sets the attributes for the servers resolved by name resolution.
*
* @since 1.20.0
*/
public Builder setAttributes(Attributes attributes) {
this.attributes = attributes;
return this;
}
/**
* Sets the Service Config parsed by {@link NameResolver.Helper#parseServiceConfig(Map)}.
*
* @since 1.20.0
*/
public Builder setServiceConfig(@Nullable Object serviceConfig) {
this.serviceConfig = serviceConfig;
return this;
}
/**
* Constructs a new {@link ResolutionResult} from this builder.
*
* @since 1.20.0
*/
public ResolutionResult build() {
return new ResolutionResult(servers, attributes, serviceConfig);
}
}
}
}

View File

@ -603,10 +603,12 @@ public abstract class AbstractManagedChannelImplBuilder
}
@Override
public void start(final Listener listener) {
listener.onAddresses(
Collections.singletonList(new EquivalentAddressGroup(address)),
Attributes.EMPTY);
public void start(Observer observer) {
observer.onResult(
ResolutionResult.newBuilder()
.setServers(Collections.singletonList(new EquivalentAddressGroup(address)))
.setAttributes(Attributes.EMPTY)
.build());
}
@Override

View File

@ -60,7 +60,7 @@ import javax.annotation.Nullable;
* A DNS-based {@link NameResolver}.
*
* <p>Each {@code A} or {@code AAAA} record emits an {@link EquivalentAddressGroup} in the list
* passed to {@link NameResolver.Listener#onAddresses(List, Attributes)}
* passed to {@link NameResolver.Observer#onResult(ResolutionResult)}.
*
* @see DnsNameResolverProvider
*/
@ -150,9 +150,9 @@ final class DnsNameResolver extends NameResolver {
private Executor executor;
private boolean resolving;
// The field must be accessed from syncContext, although the methods on a Listener can be called
// The field must be accessed from syncContext, although the methods on an Observer can be called
// from any thread.
private Listener listener;
private NameResolver.Observer observer;
DnsNameResolver(@Nullable String nsAuthority, String name, Helper helper,
Resource<Executor> executorResource, Stopwatch stopwatch, boolean isAndroid) {
@ -185,24 +185,24 @@ final class DnsNameResolver extends NameResolver {
}
@Override
public void start(Listener listener) {
Preconditions.checkState(this.listener == null, "already started");
public void start(Observer observer) {
Preconditions.checkState(this.observer == null, "already started");
executor = SharedResourceHolder.get(executorResource);
this.listener = Preconditions.checkNotNull(listener, "listener");
this.observer = Preconditions.checkNotNull(observer, "observer");
resolve();
}
@Override
public void refresh() {
Preconditions.checkState(listener != null, "not started");
Preconditions.checkState(observer != null, "not started");
resolve();
}
private final class Resolve implements Runnable {
private final Listener savedListener;
private final Observer savedObserver;
Resolve(Listener savedListener) {
this.savedListener = Preconditions.checkNotNull(savedListener, "savedListener");
Resolve(Observer savedObserver) {
this.savedObserver = Preconditions.checkNotNull(savedObserver, "savedObserver");
}
@Override
@ -230,7 +230,7 @@ final class DnsNameResolver extends NameResolver {
try {
proxiedAddr = proxyDetector.proxyFor(destination);
} catch (IOException e) {
savedListener.onError(
savedObserver.onError(
Status.UNAVAILABLE.withDescription("Unable to resolve host " + host).withCause(e));
return;
}
@ -239,7 +239,12 @@ final class DnsNameResolver extends NameResolver {
logger.finer("Using proxy address " + proxiedAddr);
}
EquivalentAddressGroup server = new EquivalentAddressGroup(proxiedAddr);
savedListener.onAddresses(Collections.singletonList(server), Attributes.EMPTY);
ResolutionResult resolutionResult =
ResolutionResult.newBuilder()
.setServers(Collections.singletonList(server))
.setAttributes(Attributes.EMPTY)
.build();
savedObserver.onResult(resolutionResult);
return;
}
@ -269,7 +274,7 @@ final class DnsNameResolver extends NameResolver {
logger.finer("Found DNS results " + resolutionResults + " for " + host);
}
} catch (Exception e) {
savedListener.onError(
savedObserver.onError(
Status.UNAVAILABLE.withDescription("Unable to resolve host " + host).withCause(e));
return;
}
@ -280,7 +285,7 @@ final class DnsNameResolver extends NameResolver {
}
servers.addAll(resolutionResults.balancerAddresses);
if (servers.isEmpty()) {
savedListener.onError(Status.UNAVAILABLE.withDescription(
savedObserver.onError(Status.UNAVAILABLE.withDescription(
"No DNS backend or balancer addresses found for " + host));
return;
}
@ -291,7 +296,7 @@ final class DnsNameResolver extends NameResolver {
parseServiceConfig(resolutionResults.txtRecords, random, getLocalHostname());
if (serviceConfig != null) {
if (serviceConfig.getError() != null) {
savedListener.onError(serviceConfig.getError());
savedObserver.onError(serviceConfig.getError());
return;
} else {
attrs.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig.getConfig());
@ -300,7 +305,9 @@ final class DnsNameResolver extends NameResolver {
} else {
logger.log(Level.FINE, "No TXT records found for {0}", new Object[]{host});
}
savedListener.onAddresses(servers, attrs.build());
ResolutionResult resolutionResult =
ResolutionResult.newBuilder().setServers(servers).setAttributes(attrs.build()).build();
savedObserver.onResult(resolutionResult);
}
}
@ -338,7 +345,7 @@ final class DnsNameResolver extends NameResolver {
return;
}
resolving = true;
executor.execute(new Resolve(listener));
executor.execute(new Resolve(observer));
}
private boolean cacheRefreshRequired() {

View File

@ -38,10 +38,16 @@ abstract class ForwardingNameResolver extends NameResolver {
}
@Override
@Deprecated
public void start(Listener listener) {
delegate.start(listener);
}
@Override
public void start(Observer observer) {
delegate.start(observer);
}
@Override
public void shutdown() {
delegate.shutdown();

View File

@ -60,6 +60,7 @@ import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.NameResolver;
import io.grpc.NameResolver.ResolutionResult;
import io.grpc.ProxyDetector;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
@ -372,8 +373,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
// may throw. We don't want to confuse our state, even if we will enter panic mode.
this.lbHelper = lbHelper;
NameResolverListenerImpl listener = new NameResolverListenerImpl(lbHelper, nameResolver);
nameResolver.start(listener);
NameResolverObserver observer = new NameResolverObserver(lbHelper, nameResolver);
nameResolver.start(observer);
nameResolverStarted = true;
}
@ -1301,22 +1302,24 @@ final class ManagedChannelImpl extends ManagedChannel implements
}
}
private class NameResolverListenerImpl implements NameResolver.Listener {
private final class NameResolverObserver extends NameResolver.Observer {
final LbHelperImpl helper;
final NameResolver resolver;
NameResolverListenerImpl(LbHelperImpl helperImpl, NameResolver resolver) {
NameResolverObserver(LbHelperImpl helperImpl, NameResolver resolver) {
this.helper = checkNotNull(helperImpl, "helperImpl");
this.resolver = checkNotNull(resolver, "resolver");
}
@Override
public void onAddresses(final List<EquivalentAddressGroup> servers, final Attributes attrs) {
public void onResult(final ResolutionResult resolutionResult) {
final class NamesResolved implements Runnable {
@SuppressWarnings("ReferenceEquality")
@Override
public void run() {
List<EquivalentAddressGroup> servers = resolutionResult.getServers();
Attributes attrs = resolutionResult.getAttributes();
channelLogger.log(
ChannelLogLevel.DEBUG, "Resolved address: {0}, config={1}", servers, attrs);
@ -1371,7 +1374,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
}
// Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
if (NameResolverListenerImpl.this.helper == ManagedChannelImpl.this.lbHelper) {
if (NameResolverObserver.this.helper == ManagedChannelImpl.this.lbHelper) {
if (servers.isEmpty() && !helper.lb.canHandleEmptyAddressListFromNameResolution()) {
handleErrorInSyncContext(Status.UNAVAILABLE.withDescription(
"Name resolver " + resolver + " returned an empty list"));
@ -1412,7 +1415,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
haveBackends = false;
}
// Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
if (NameResolverObserver.this.helper != ManagedChannelImpl.this.lbHelper) {
return;
}
helper.lb.handleNameResolutionError(error);

View File

@ -24,7 +24,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -37,11 +36,11 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.net.InetAddresses;
import com.google.common.testing.FakeTicker;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.NameResolver;
import io.grpc.NameResolver.Helper.ConfigOrError;
import io.grpc.NameResolver.ResolutionResult;
import io.grpc.ProxyDetector;
import io.grpc.Status;
import io.grpc.Status.Code;
@ -141,9 +140,9 @@ public class DnsNameResolverTest {
};
@Mock
private NameResolver.Listener mockListener;
private NameResolver.Observer mockObserver;
@Captor
private ArgumentCaptor<List<EquivalentAddressGroup>> resultCaptor;
private ArgumentCaptor<ResolutionResult> resultCaptor;
@Nullable
private String networkaddressCacheTtlPropertyValue;
@Mock
@ -294,15 +293,15 @@ public class DnsNameResolverTest {
when(mockResolver.resolveAddress(anyString())).thenReturn(answer1).thenReturn(answer2);
resolver.setAddressResolver(mockResolver);
resolver.start(mockListener);
resolver.start(mockObserver);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class));
verify(mockObserver).onResult(resultCaptor.capture());
assertAnswerMatches(answer1, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
resolver.refresh();
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener, times(2)).onAddresses(resultCaptor.capture(), any(Attributes.class));
verify(mockObserver, times(2)).onResult(resultCaptor.capture());
assertAnswerMatches(answer2, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
@ -321,12 +320,12 @@ public class DnsNameResolverTest {
}
});
nr.start(mockListener);
nr.start(mockObserver);
assertThat(fakeExecutor.runDueTasks()).isEqualTo(1);
ArgumentCaptor<Status> ac = ArgumentCaptor.forClass(Status.class);
verify(mockListener).onError(ac.capture());
verifyNoMoreInteractions(mockListener);
verify(mockObserver).onError(ac.capture());
verifyNoMoreInteractions(mockObserver);
assertThat(ac.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(ac.getValue().getDescription()).contains("No DNS backend or balancer addresses");
}
@ -346,9 +345,9 @@ public class DnsNameResolverTest {
.thenThrow(new AssertionError("should not called twice"));
resolver.setAddressResolver(mockResolver);
resolver.start(mockListener);
resolver.start(mockObserver);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class));
verify(mockObserver).onResult(resultCaptor.capture());
assertAnswerMatches(answer1, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
@ -356,7 +355,7 @@ public class DnsNameResolverTest {
resolver.refresh();
assertEquals(0, fakeExecutor.runDueTasks());
assertEquals(0, fakeClock.numPendingTasks());
verifyNoMoreInteractions(mockListener);
verifyNoMoreInteractions(mockObserver);
resolver.shutdown();
@ -379,9 +378,9 @@ public class DnsNameResolverTest {
.thenThrow(new AssertionError("should not reach here."));
resolver.setAddressResolver(mockResolver);
resolver.start(mockListener);
resolver.start(mockObserver);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class));
verify(mockObserver).onResult(resultCaptor.capture());
assertAnswerMatches(answer, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
@ -390,7 +389,7 @@ public class DnsNameResolverTest {
resolver.refresh();
assertEquals(0, fakeExecutor.runDueTasks());
assertEquals(0, fakeClock.numPendingTasks());
verifyNoMoreInteractions(mockListener);
verifyNoMoreInteractions(mockObserver);
resolver.shutdown();
@ -413,16 +412,16 @@ public class DnsNameResolverTest {
.thenReturn(answer2);
resolver.setAddressResolver(mockResolver);
resolver.start(mockListener);
resolver.start(mockObserver);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class));
verify(mockObserver).onResult(resultCaptor.capture());
assertAnswerMatches(answer1, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
fakeTicker.advance(ttl + 1, TimeUnit.SECONDS);
resolver.refresh();
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener, times(2)).onAddresses(resultCaptor.capture(), any(Attributes.class));
verify(mockObserver, times(2)).onResult(resultCaptor.capture());
assertAnswerMatches(answer2, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
@ -455,9 +454,9 @@ public class DnsNameResolverTest {
when(mockResolver.resolveAddress(anyString())).thenReturn(answer1).thenReturn(answer2);
resolver.setAddressResolver(mockResolver);
resolver.start(mockListener);
resolver.start(mockObserver);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class));
verify(mockObserver).onResult(resultCaptor.capture());
assertAnswerMatches(answer1, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
@ -465,12 +464,12 @@ public class DnsNameResolverTest {
resolver.refresh();
assertEquals(0, fakeExecutor.runDueTasks());
assertEquals(0, fakeClock.numPendingTasks());
verifyNoMoreInteractions(mockListener);
verifyNoMoreInteractions(mockObserver);
fakeTicker.advance(1, TimeUnit.SECONDS);
resolver.refresh();
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener, times(2)).onAddresses(resultCaptor.capture(), any(Attributes.class));
verify(mockObserver, times(2)).onResult(resultCaptor.capture());
assertAnswerMatches(answer2, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
@ -632,11 +631,11 @@ public class DnsNameResolverTest {
AddressResolver mockAddressResolver = mock(AddressResolver.class);
when(mockAddressResolver.resolveAddress(anyString())).thenThrow(new AssertionError());
resolver.setAddressResolver(mockAddressResolver);
resolver.start(mockListener);
resolver.start(mockObserver);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class));
List<EquivalentAddressGroup> result = resultCaptor.getValue();
verify(mockObserver).onResult(resultCaptor.capture());
List<EquivalentAddressGroup> result = resultCaptor.getValue().getServers();
assertThat(result).hasSize(1);
EquivalentAddressGroup eag = result.get(0);
assertThat(eag.getAddresses()).hasSize(1);
@ -1072,10 +1071,10 @@ public class DnsNameResolverTest {
}
private static void assertAnswerMatches(
List<InetAddress> addrs, int port, List<EquivalentAddressGroup> results) {
assertEquals(addrs.size(), results.size());
List<InetAddress> addrs, int port, ResolutionResult resolutionResult) {
assertEquals(addrs.size(), resolutionResult.getServers().size());
for (int i = 0; i < addrs.size(); i++) {
EquivalentAddressGroup addrGroup = results.get(i);
EquivalentAddressGroup addrGroup = resolutionResult.getServers().get(i);
InetSocketAddress socketAddr =
(InetSocketAddress) Iterables.getOnlyElement(addrGroup.getAddresses());
assertEquals("Addr " + i, port, socketAddr.getPort());

View File

@ -25,6 +25,7 @@ import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.ForwardingTestUtil;
import io.grpc.NameResolver;
import io.grpc.NameResolver.ResolutionResult;
import io.grpc.Status;
import java.lang.reflect.Method;
import java.util.Collections;
@ -60,7 +61,8 @@ public class ForwardingNameResolverTest {
}
@Test
public void start() {
@SuppressWarnings("deprecation") // this will be removed in 1.21.0
public void start_listener() {
NameResolver.Listener listener = new NameResolver.Listener() {
@Override
public void onAddresses(List<EquivalentAddressGroup> servers, Attributes attributes) { }
@ -72,4 +74,21 @@ public class ForwardingNameResolverTest {
forwarder.start(listener);
verify(delegate).start(listener);
}
@Test
public void start_observer() {
NameResolver.Observer observer = new NameResolver.Observer() {
@Override
public void onResult(ResolutionResult result) {
}
@Override
public void onError(Status error) { }
};
forwarder.start(observer);
verify(delegate).start(observer);
}
}

View File

@ -176,7 +176,7 @@ public class ManagedChannelImplGetNameResolverTest {
return uri.getAuthority();
}
@Override public void start(final Listener listener) {}
@Override public void start(final Observer observer) {}
@Override public void shutdown() {}
}

View File

@ -53,6 +53,7 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.NameResolver;
import io.grpc.NameResolver.ResolutionResult;
import io.grpc.Status;
import io.grpc.StringMarshaller;
import io.grpc.internal.FakeClock.ScheduledTask;
@ -138,10 +139,11 @@ public class ManagedChannelImplIdlenessTest {
@Mock private NameResolver.Factory mockNameResolverFactory;
@Mock private ClientCall.Listener<Integer> mockCallListener;
@Mock private ClientCall.Listener<Integer> mockCallListener2;
@Captor private ArgumentCaptor<NameResolver.Listener> nameResolverListenerCaptor;
@Captor private ArgumentCaptor<NameResolver.Observer> nameResolverObserverCaptor;
private BlockingQueue<MockClientTransportInfo> newTransports;
@Before
@SuppressWarnings("deprecation") // For NameResolver.Listener
public void setUp() {
LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider);
when(mockNameResolver.getServiceAuthority()).thenReturn(AUTHORITY);
@ -193,6 +195,7 @@ public class ManagedChannelImplIdlenessTest {
any(ClientTransportFactory.ClientTransportOptions.class),
any(ChannelLogger.class));
verify(mockNameResolver, never()).start(any(NameResolver.Listener.class));
verify(mockNameResolver, never()).start(any(NameResolver.Observer.class));
}
@After
@ -216,10 +219,15 @@ public class ManagedChannelImplIdlenessTest {
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
verify(mockNameResolver).start(nameResolverListenerCaptor.capture());
verify(mockNameResolver).start(nameResolverObserverCaptor.capture());
// Simulate new address resolved to make sure the LoadBalancer is correctly linked to
// the NameResolver.
nameResolverListenerCaptor.getValue().onAddresses(servers, Attributes.EMPTY);
ResolutionResult resolutionResult =
ResolutionResult.newBuilder()
.setServers(servers)
.setAttributes(Attributes.EMPTY)
.build();
nameResolverObserverCaptor.getValue().onResult(resolutionResult);
verify(mockLoadBalancer).handleResolvedAddressGroups(servers, Attributes.EMPTY);
}

View File

@ -91,6 +91,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.NameResolver;
import io.grpc.NameResolver.Helper.ConfigOrError;
import io.grpc.NameResolver.ResolutionResult;
import io.grpc.ProxiedSocketAddress;
import io.grpc.ProxyDetector;
import io.grpc.SecurityLevel;
@ -674,7 +675,7 @@ public class ManagedChannelImplTest {
.handleSubchannelState(same(subchannel2), stateInfoCaptor.capture());
assertSame(CONNECTING, stateInfoCaptor.getValue().getState());
resolver.listener.onError(resolutionError);
resolver.observer.onError(resolutionError);
verify(mockLoadBalancer).handleNameResolutionError(resolutionError);
verifyNoMoreInteractions(mockLoadBalancer);
@ -684,7 +685,7 @@ public class ManagedChannelImplTest {
// No more callback should be delivered to LoadBalancer after it's shut down
transportInfo2.listener.transportReady();
resolver.listener.onError(resolutionError);
resolver.observer.onError(resolutionError);
resolver.resolved();
verifyNoMoreInteractions(mockLoadBalancer);
}
@ -801,7 +802,7 @@ public class ManagedChannelImplTest {
assertEquals(0, timer.numPendingTasks());
// Verify that the successful resolution reset the backoff policy
resolver.listener.onError(error);
resolver.observer.onError(error);
timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1);
assertEquals(3, resolver.refreshCalled);
timer.forwardNanos(1);
@ -2585,25 +2586,31 @@ public class ManagedChannelImplTest {
createChannel();
int prevSize = getStats(channel).channelTrace.events.size();
nameResolverFactory.resolvers.get(0).listener.onAddresses(
Collections.singletonList(new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
Attributes.EMPTY);
ResolutionResult resolutionResult1 = ResolutionResult.newBuilder()
.setServers(Collections.singletonList(
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.setAttributes(Attributes.EMPTY)
.build();
nameResolverFactory.resolvers.get(0).observer.onResult(resolutionResult1);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
prevSize = getStats(channel).channelTrace.events.size();
nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
nameResolverFactory.resolvers.get(0).observer.onError(Status.INTERNAL);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
prevSize = getStats(channel).channelTrace.events.size();
nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
nameResolverFactory.resolvers.get(0).observer.onError(Status.INTERNAL);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
prevSize = getStats(channel).channelTrace.events.size();
nameResolverFactory.resolvers.get(0).listener.onAddresses(
Collections.singletonList(new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
Attributes.EMPTY);
ResolutionResult resolutionResult2 = ResolutionResult.newBuilder()
.setServers(Collections.singletonList(
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.setAttributes(Attributes.EMPTY)
.build();
nameResolverFactory.resolvers.get(0).observer.onResult(resolutionResult2);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
}
@ -2623,10 +2630,13 @@ public class ManagedChannelImplTest {
Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, new HashMap<String, Object>())
.build();
nameResolverFactory.resolvers.get(0).listener.onAddresses(
Collections.singletonList(new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
attributes);
ResolutionResult resolutionResult1 = ResolutionResult.newBuilder()
.setServers(Collections.singletonList(
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.setAttributes(attributes)
.build();
nameResolverFactory.resolvers.get(0).observer.onResult(resolutionResult1);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
assertThat(getStats(channel).channelTrace.events.get(prevSize))
.isEqualTo(new ChannelTrace.Event.Builder()
@ -2636,10 +2646,13 @@ public class ManagedChannelImplTest {
.build());
prevSize = getStats(channel).channelTrace.events.size();
nameResolverFactory.resolvers.get(0).listener.onAddresses(
Collections.singletonList(new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
attributes);
ResolutionResult resolutionResult2 = ResolutionResult.newBuilder().setServers(
Collections.singletonList(
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.setAttributes(attributes)
.build();
nameResolverFactory.resolvers.get(0).observer.onResult(resolutionResult2);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
prevSize = getStats(channel).channelTrace.events.size();
@ -2650,10 +2663,13 @@ public class ManagedChannelImplTest {
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig)
.build();
timer.forwardNanos(1234);
nameResolverFactory.resolvers.get(0).listener.onAddresses(
Collections.singletonList(new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
attributes);
ResolutionResult resolutionResult3 = ResolutionResult.newBuilder()
.setServers(Collections.singletonList(
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.setAttributes(attributes)
.build();
nameResolverFactory.resolvers.get(0).observer.onResult(resolutionResult3);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
assertThat(getStats(channel).channelTrace.events.get(prevSize))
.isEqualTo(new ChannelTrace.Event.Builder()
@ -3192,7 +3208,7 @@ public class ManagedChannelImplTest {
final List<EquivalentAddressGroup> addresses =
ImmutableList.of(new EquivalentAddressGroup(new SocketAddress() {}));
final class FakeNameResolver extends NameResolver {
Listener listener;
Observer observer;
@Override
public String getServiceAuthority() {
@ -3200,13 +3216,17 @@ public class ManagedChannelImplTest {
}
@Override
public void start(Listener listener) {
this.listener = listener;
listener.onAddresses(addresses,
Attributes.newBuilder()
.set(
GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG,
ImmutableMap.<String, Object>of("loadBalancingPolicy", "kaboom"))
public void start(Observer observer) {
this.observer = observer;
observer.onResult(
ResolutionResult.newBuilder()
.setServers(addresses)
.setAttributes(
Attributes.newBuilder()
.set(
GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG,
ImmutableMap.<String, Object>of("loadBalancingPolicy", "kaboom"))
.build())
.build());
}
@ -3259,12 +3279,16 @@ public class ManagedChannelImplTest {
// ok the service config is bad, let's fix it.
factory.resolver.listener.onAddresses(addresses,
Attributes.newBuilder()
.set(
GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG,
ImmutableMap.<String, Object>of("loadBalancingPolicy", "round_robin"))
.build());
factory.resolver.observer.onResult(
ResolutionResult.newBuilder()
.setServers(addresses)
.setAttributes(
Attributes.newBuilder()
.set(
GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG,
ImmutableMap.<String, Object>of("loadBalancingPolicy", "round_robin"))
.build())
.build());
ClientCall<Void, Void> call2 = mychannel.newCall(
TestMethodDescriptors.voidMethod(),
@ -3294,7 +3318,7 @@ public class ManagedChannelImplTest {
}
@Override
public void start(Listener listener) {
public void start(Observer observer) {
}
@Override
@ -3803,7 +3827,7 @@ public class ManagedChannelImplTest {
}
final class FakeNameResolver extends NameResolver {
Listener listener;
Observer observer;
boolean shutdown;
int refreshCalled;
Status error;
@ -3816,8 +3840,8 @@ public class ManagedChannelImplTest {
return expectedUri.getAuthority();
}
@Override public void start(final Listener listener) {
this.listener = listener;
@Override public void start(Observer observer) {
this.observer = observer;
if (resolvedAtStart) {
resolved();
}
@ -3830,10 +3854,14 @@ public class ManagedChannelImplTest {
void resolved() {
if (error != null) {
listener.onError(error);
observer.onError(error);
return;
}
listener.onAddresses(servers, nextResolvedAttributes.get());
observer.onResult(
ResolutionResult.newBuilder()
.setServers(servers)
.setAttributes(nextResolvedAttributes.get())
.build());
}
@Override public void shutdown() {

View File

@ -81,7 +81,7 @@ public class OverrideAuthorityNameResolverTest {
NameResolver overrideResolver =
factory.newNameResolver(URI.create("dns:///localhost:443"), HELPER);
assertNotNull(overrideResolver);
NameResolver.Listener listener = mock(NameResolver.Listener.class);
NameResolver.Observer listener = mock(NameResolver.Observer.class);
overrideResolver.start(listener);
verify(mockResolver).start(listener);