diff --git a/xds/src/main/java/io/grpc/xds/CsdsService.java b/xds/src/main/java/io/grpc/xds/CsdsService.java index 0102836660..a296beb45d 100644 --- a/xds/src/main/java/io/grpc/xds/CsdsService.java +++ b/xds/src/main/java/io/grpc/xds/CsdsService.java @@ -39,6 +39,8 @@ import io.grpc.xds.client.XdsClient.ResourceMetadata; import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus; import io.grpc.xds.client.XdsClient.ResourceMetadata.UpdateFailureState; import io.grpc.xds.client.XdsResourceType; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -117,43 +119,58 @@ public final class CsdsService implements BindableService { private boolean handleRequest( ClientStatusRequest request, StreamObserver responseObserver) { - StatusException error; - try { - responseObserver.onNext(getConfigDumpForRequest(request)); - return true; - } catch (StatusException e) { - error = e; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.log(Level.FINE, "Server interrupted while building CSDS config dump", e); - error = Status.ABORTED.withDescription("Thread interrupted").withCause(e).asException(); - } catch (RuntimeException e) { - logger.log(Level.WARNING, "Unexpected error while building CSDS config dump", e); - error = - Status.INTERNAL.withDescription("Unexpected internal error").withCause(e).asException(); + StatusException error = null; + + if (request.getNodeMatchersCount() > 0) { + error = new StatusException( + Status.INVALID_ARGUMENT.withDescription("node_matchers not supported")); + } else { + List targets = xdsClientPoolFactory.getTargets(); + List clientConfigs = new ArrayList<>(targets.size()); + + for (int i = 0; i < targets.size() && error == null; i++) { + try { + ClientConfig clientConfig = getConfigForRequest(targets.get(i)); + if (clientConfig != null) { + clientConfigs.add(clientConfig); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.log(Level.FINE, "Server interrupted while building CSDS config dump", e); + error = Status.ABORTED.withDescription("Thread interrupted").withCause(e).asException(); + } catch (RuntimeException e) { + logger.log(Level.WARNING, "Unexpected error while building CSDS config dump", e); + error = Status.INTERNAL.withDescription("Unexpected internal error").withCause(e) + .asException(); + } + } + + try { + responseObserver.onNext(getStatusResponse(clientConfigs)); + } catch (RuntimeException e) { + logger.log(Level.WARNING, "Unexpected error while processing CSDS config dump", e); + error = Status.INTERNAL.withDescription("Unexpected internal error").withCause(e) + .asException(); + } + } + + if (error == null) { + return true; // All clients reported without error } responseObserver.onError(error); return false; } - private ClientStatusResponse getConfigDumpForRequest(ClientStatusRequest request) - throws StatusException, InterruptedException { - if (request.getNodeMatchersCount() > 0) { - throw new StatusException( - Status.INVALID_ARGUMENT.withDescription("node_matchers not supported")); - } - - ObjectPool xdsClientPool = xdsClientPoolFactory.get(); + private ClientConfig getConfigForRequest(String target) throws InterruptedException { + ObjectPool xdsClientPool = xdsClientPoolFactory.get(target); if (xdsClientPool == null) { - return ClientStatusResponse.getDefaultInstance(); + return null; } XdsClient xdsClient = null; try { xdsClient = xdsClientPool.getObject(); - return ClientStatusResponse.newBuilder() - .addConfig(getClientConfigForXdsClient(xdsClient)) - .build(); + return getClientConfigForXdsClient(xdsClient, target); } finally { if (xdsClient != null) { xdsClientPool.returnObject(xdsClient); @@ -161,9 +178,18 @@ public final class CsdsService implements BindableService { } } + private ClientStatusResponse getStatusResponse(List clientConfigs) { + if (clientConfigs.isEmpty()) { + return ClientStatusResponse.getDefaultInstance(); + } + return ClientStatusResponse.newBuilder().addAllConfig(clientConfigs).build(); + } + @VisibleForTesting - static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) throws InterruptedException { + static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient, String target) + throws InterruptedException { ClientConfig.Builder builder = ClientConfig.newBuilder() + .setClientScope(target) .setNode(xdsClient.getBootstrapInfo().node().toEnvoyProtoNode()); Map, Map> metadataByType = diff --git a/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java index 39b9ed0d09..0073cce1a8 100644 --- a/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java +++ b/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java @@ -36,6 +36,6 @@ public final class InternalSharedXdsClientPoolProvider { public static ObjectPool getOrCreate(String target) throws XdsInitializationException { - return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(); + return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target); } } diff --git a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java index 5ae1f5bbce..c9195896d8 100644 --- a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java +++ b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.xds.GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.GrpcUtil; import io.grpc.internal.ObjectPool; @@ -32,6 +33,7 @@ import io.grpc.xds.client.XdsClientImpl; import io.grpc.xds.client.XdsInitializationException; import io.grpc.xds.internal.security.TlsContextManagerImpl; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; @@ -53,7 +55,7 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory { private final Bootstrapper bootstrapper; private final Object lock = new Object(); private final AtomicReference> bootstrapOverride = new AtomicReference<>(); - private volatile ObjectPool xdsClientPool; + private final Map> targetToXdsClientMap = new ConcurrentHashMap<>(); SharedXdsClientPoolProvider() { this(new GrpcBootstrapperImpl()); @@ -75,16 +77,16 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory { @Override @Nullable - public ObjectPool get() { - return xdsClientPool; + public ObjectPool get(String target) { + return targetToXdsClientMap.get(target); } @Override - public ObjectPool getOrCreate() throws XdsInitializationException { - ObjectPool ref = xdsClientPool; + public ObjectPool getOrCreate(String target) throws XdsInitializationException { + ObjectPool ref = targetToXdsClientMap.get(target); if (ref == null) { synchronized (lock) { - ref = xdsClientPool; + ref = targetToXdsClientMap.get(target); if (ref == null) { BootstrapInfo bootstrapInfo; Map rawBootstrap = bootstrapOverride.get(); @@ -96,13 +98,20 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory { if (bootstrapInfo.servers().isEmpty()) { throw new XdsInitializationException("No xDS server provided"); } - ref = xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo); + ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target); + targetToXdsClientMap.put(target, ref); } } } return ref; } + @Override + public ImmutableList getTargets() { + return ImmutableList.copyOf(targetToXdsClientMap.keySet()); + } + + private static class SharedXdsClientPoolProviderHolder { private static final SharedXdsClientPoolProvider instance = new SharedXdsClientPoolProvider(); } @@ -110,7 +119,11 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory { @ThreadSafe @VisibleForTesting static class RefCountedXdsClientObjectPool implements ObjectPool { + + private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER = + new ExponentialBackoffPolicy.Provider(); private final BootstrapInfo bootstrapInfo; + private final String target; // The target associated with the xDS client. private final Object lock = new Object(); @GuardedBy("lock") private ScheduledExecutorService scheduler; @@ -120,8 +133,9 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory { private int refCount; @VisibleForTesting - RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo) { + RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target) { this.bootstrapInfo = checkNotNull(bootstrapInfo); + this.target = target; } @Override @@ -136,7 +150,7 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory { DEFAULT_XDS_TRANSPORT_FACTORY, bootstrapInfo, scheduler, - new ExponentialBackoffPolicy.Provider(), + BACKOFF_POLICY_PROVIDER, GrpcUtil.STOPWATCH_SUPPLIER, TimeProvider.SYSTEM_TIME_PROVIDER, MessagePrinter.INSTANCE, @@ -167,5 +181,10 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory { return xdsClient; } } + + public String getTarget() { + return target; + } } + } diff --git a/xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java b/xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java index c649b3b306..313eb67511 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java @@ -19,6 +19,7 @@ package io.grpc.xds; import io.grpc.internal.ObjectPool; import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsInitializationException; +import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -26,7 +27,9 @@ interface XdsClientPoolFactory { void setBootstrapOverride(Map bootstrap); @Nullable - ObjectPool get(); + ObjectPool get(String target); - ObjectPool getOrCreate() throws XdsInitializationException; + ObjectPool getOrCreate(String target) throws XdsInitializationException; + + List getTargets(); } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index 9ad9b6e82f..f0329387fc 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -66,6 +66,7 @@ import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClient.ResourceWatcher; import io.grpc.xds.client.XdsLogger; import io.grpc.xds.client.XdsLogger.XdsLogLevel; +import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -104,6 +105,7 @@ final class XdsNameResolver extends NameResolver { private final XdsLogger logger; @Nullable private final String targetAuthority; + private final String target; private final String serviceAuthority; // Encoded version of the service authority as per // https://datatracker.ietf.org/doc/html/rfc3986#section-3.2. @@ -133,23 +135,24 @@ final class XdsNameResolver extends NameResolver { private boolean receivedConfig; XdsNameResolver( - @Nullable String targetAuthority, String name, @Nullable String overrideAuthority, + URI targetUri, String name, @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser, SynchronizationContext syncContext, ScheduledExecutorService scheduler, @Nullable Map bootstrapOverride) { - this(targetAuthority, name, overrideAuthority, serviceConfigParser, syncContext, scheduler, - SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance, - FilterRegistry.getDefaultRegistry(), bootstrapOverride); + this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser, + syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(), + ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride); } @VisibleForTesting XdsNameResolver( - @Nullable String targetAuthority, String name, @Nullable String overrideAuthority, - ServiceConfigParser serviceConfigParser, + URI targetUri, @Nullable String targetAuthority, String name, + @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser, SynchronizationContext syncContext, ScheduledExecutorService scheduler, XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random, FilterRegistry filterRegistry, @Nullable Map bootstrapOverride) { this.targetAuthority = targetAuthority; + target = targetUri.toString(); // The name might have multiple slashes so encode it before verifying. serviceAuthority = checkNotNull(name, "name"); @@ -180,7 +183,7 @@ final class XdsNameResolver extends NameResolver { public void start(Listener2 listener) { this.listener = checkNotNull(listener, "listener"); try { - xdsClientPool = xdsClientPoolFactory.getOrCreate(); + xdsClientPool = xdsClientPoolFactory.getOrCreate(target); } catch (Exception e) { listener.onError( Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e)); diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java b/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java index 598be07fcd..8d0e59eaa9 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java @@ -78,7 +78,7 @@ public final class XdsNameResolverProvider extends NameResolverProvider { targetUri); String name = targetPath.substring(1); return new XdsNameResolver( - targetUri.getAuthority(), name, args.getOverrideAuthority(), + targetUri, name, args.getOverrideAuthority(), args.getServiceConfigParser(), args.getSynchronizationContext(), args.getScheduledExecutorService(), bootstrapOverride); diff --git a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java index bf8603fb3e..dfb7c4fb7d 100644 --- a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java +++ b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java @@ -171,7 +171,7 @@ final class XdsServerWrapper extends Server { private void internalStart() { try { - xdsClientPool = xdsClientPoolFactory.getOrCreate(); + xdsClientPool = xdsClientPoolFactory.getOrCreate(""); } catch (Exception e) { StatusException statusException = Status.UNAVAILABLE.withDescription( "Failed to initialize xDS").withCause(e).asException(); diff --git a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java index bf330b1007..63b9cda043 100644 --- a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java +++ b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java @@ -54,6 +54,7 @@ import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClient.ResourceMetadata; import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus; import io.grpc.xds.client.XdsResourceType; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -85,6 +86,7 @@ public class CsdsServiceTest { private static final XdsResourceType CDS = XdsClusterResource.getInstance(); private static final XdsResourceType RDS = XdsRouteConfigureResource.getInstance(); private static final XdsResourceType EDS = XdsEndpointResource.getInstance(); + public static final String FAKE_CLIENT_SCOPE = "fake"; @RunWith(JUnit4.class) public static class ServiceTests { @@ -198,13 +200,13 @@ public class CsdsServiceTest { @Override @Nullable - public ObjectPool get() { + public ObjectPool get(String target) { // xDS client not ready on the first call, then becomes ready. if (!calledOnce) { calledOnce = true; return null; } else { - return super.get(); + return super.get(target); } } }); @@ -267,11 +269,51 @@ public class CsdsServiceTest { assertThat(responseObserver.getError()).isNull(); } + @Test + public void multipleXdsClients() { + FakeXdsClient xdsClient1 = new FakeXdsClient(); + FakeXdsClient xdsClient2 = new FakeXdsClient(); + Map clientMap = new HashMap<>(); + clientMap.put("target1", xdsClient1); + clientMap.put("target2", xdsClient2); + FakeXdsClientPoolFactory factory = new FakeXdsClientPoolFactory(clientMap); + CsdsService csdsService = new CsdsService(factory); + grpcServerRule.getServiceRegistry().addService(csdsService); + + StreamRecorder responseObserver = StreamRecorder.create(); + StreamObserver requestObserver = + csdsAsyncStub.streamClientStatus(responseObserver); + + requestObserver.onNext(REQUEST); + requestObserver.onCompleted(); + + List responses = responseObserver.getValues(); + assertThat(responses).hasSize(1); + Collection targets = verifyMultiResponse(responses.get(0), 2); + assertThat(targets).containsExactly("target1", "target2"); + responseObserver.onCompleted(); + } + private void verifyResponse(ClientStatusResponse response) { assertThat(response.getConfigCount()).isEqualTo(1); ClientConfig clientConfig = response.getConfig(0); verifyClientConfigNode(clientConfig); verifyClientConfigNoResources(XDS_CLIENT_NO_RESOURCES, clientConfig); + assertThat(clientConfig.getClientScope()).isEmpty(); + } + + private Collection verifyMultiResponse(ClientStatusResponse response, int numExpected) { + assertThat(response.getConfigCount()).isEqualTo(numExpected); + + List clientScopes = new ArrayList<>(); + for (int i = 0; i < numExpected; i++) { + ClientConfig clientConfig = response.getConfig(i); + verifyClientConfigNode(clientConfig); + verifyClientConfigNoResources(XDS_CLIENT_NO_RESOURCES, clientConfig); + clientScopes.add(clientConfig.getClientScope()); + } + + return clientScopes; } private void verifyRequestInvalidResponseStatus(Status status) { @@ -350,9 +392,11 @@ public class CsdsServiceTest { ); } }; - ClientConfig clientConfig = CsdsService.getClientConfigForXdsClient(fakeXdsClient); + ClientConfig clientConfig = CsdsService.getClientConfigForXdsClient(fakeXdsClient, + FAKE_CLIENT_SCOPE); verifyClientConfigNode(clientConfig); + assertThat(clientConfig.getClientScope()).isEqualTo(FAKE_CLIENT_SCOPE); // Minimal verification to confirm that the data/metadata XdsClient provides, // is propagated to the correct resource types. @@ -390,9 +434,11 @@ public class CsdsServiceTest { @Test public void getClientConfigForXdsClient_noSubscribedResources() throws InterruptedException { - ClientConfig clientConfig = CsdsService.getClientConfigForXdsClient(XDS_CLIENT_NO_RESOURCES); + ClientConfig clientConfig = + CsdsService.getClientConfigForXdsClient(XDS_CLIENT_NO_RESOURCES, FAKE_CLIENT_SCOPE); verifyClientConfigNode(clientConfig); verifyClientConfigNoResources(XDS_CLIENT_NO_RESOURCES, clientConfig); + assertThat(clientConfig.getClientScope()).isEqualTo(FAKE_CLIENT_SCOPE); } } @@ -460,22 +506,35 @@ public class CsdsServiceTest { public Map> getSubscribedResourceTypesWithTypeUrl() { return ImmutableMap.of(); } + } private static class FakeXdsClientPoolFactory implements XdsClientPoolFactory { - @Nullable private final XdsClient xdsClient; + private final Map xdsClientMap = new HashMap<>(); + private boolean isOldStyle + ; private FakeXdsClientPoolFactory(@Nullable XdsClient xdsClient) { - this.xdsClient = xdsClient; + if (xdsClient != null) { + xdsClientMap.put("", xdsClient); + } + isOldStyle = true; + } + + private FakeXdsClientPoolFactory(Map xdsClientMap) { + this.xdsClientMap.putAll(xdsClientMap); + isOldStyle = false; } @Override @Nullable - public ObjectPool get() { + public ObjectPool get(String target) { + String targetToUse = isOldStyle ? "" : target; + return new ObjectPool() { @Override public XdsClient getObject() { - return xdsClient; + return xdsClientMap.get(targetToUse); } @Override @@ -485,13 +544,18 @@ public class CsdsServiceTest { }; } + @Override + public List getTargets() { + return new ArrayList<>(xdsClientMap.keySet()); + } + @Override public void setBootstrapOverride(Map bootstrap) { throw new UnsupportedOperationException("Should not be called"); } @Override - public ObjectPool getOrCreate() { + public ObjectPool getOrCreate(String target) { throw new UnsupportedOperationException("Should not be called"); } } diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index 6b04edcb9b..d41630cdb4 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -340,8 +340,7 @@ public abstract class GrpcXdsClientImplTestBase { } }; - xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, - ignoreResourceDeletion()); + xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, ignoreResourceDeletion()); BootstrapInfo bootstrapInfo = Bootstrapper.BootstrapInfo.builder() .servers(Collections.singletonList(xdsServerInfo)) @@ -2974,6 +2973,7 @@ public abstract class GrpcXdsClientImplTestBase { anotherWatcher, fakeWatchClock.getScheduledExecutorService()); verifyResourceMetadataRequested(CDS, CDS_RESOURCE); verifyResourceMetadataRequested(CDS, anotherCdsResource); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); call.verifyRequest(CDS, Arrays.asList(CDS_RESOURCE, anotherCdsResource), "", "", NODE); assertThat(fakeWatchClock.runDueTasks()).isEqualTo(2); diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplV3Test.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplV3Test.java index 2b2ce5cbd7..40a9bff514 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplV3Test.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplV3Test.java @@ -17,6 +17,7 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; +import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -144,7 +145,8 @@ public class GrpcXdsClientImplV3Test extends GrpcXdsClientImplTestBase { assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended adsEnded.set(false); @SuppressWarnings("unchecked") - StreamObserver requestObserver = mock(StreamObserver.class); + StreamObserver requestObserver = + mock(StreamObserver.class, delegatesTo(new MockStreamObserver())); DiscoveryRpcCall call = new DiscoveryRpcCallV3(requestObserver, responseObserver); resourceDiscoveryCalls.offer(call); Context.current().addListener( @@ -874,6 +876,19 @@ public class GrpcXdsClientImplV3Test extends GrpcXdsClientImplTestBase { } return node.equals(argument.getNode()); } + + @Override + public String toString() { + return "DiscoveryRequestMatcher{" + + "node=" + node + + ", versionInfo='" + versionInfo + '\'' + + ", typeUrl='" + typeUrl + '\'' + + ", resources=" + resources + + ", responseNonce='" + responseNonce + '\'' + + ", errorCode=" + errorCode + + ", errorMessages=" + errorMessages + + '}'; + } } /** @@ -901,4 +916,23 @@ public class GrpcXdsClientImplV3Test extends GrpcXdsClientImplTestBase { return actual.equals(expected); } } + + private static class MockStreamObserver implements StreamObserver { + private final List requests = new ArrayList<>(); + + @Override + public void onNext(DiscoveryRequest value) { + requests.add(value); + } + + @Override + public void onError(Throwable t) { + // Ignore + } + + @Override + public void onCompleted() { + // Ignore + } + } } diff --git a/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java b/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java index 0687b51aea..ee164938b2 100644 --- a/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java +++ b/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java @@ -51,6 +51,7 @@ public class SharedXdsClientPoolProviderTest { @Rule public final ExpectedException thrown = ExpectedException.none(); private final Node node = Node.newBuilder().setId("SharedXdsClientPoolProviderTest").build(); + private static final String DUMMY_TARGET = "dummy"; @Mock private GrpcBootstrapperImpl bootstrapper; @@ -63,8 +64,8 @@ public class SharedXdsClientPoolProviderTest { SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(bootstrapper); thrown.expect(XdsInitializationException.class); thrown.expectMessage("No xDS server provided"); - provider.getOrCreate(); - assertThat(provider.get()).isNull(); + provider.getOrCreate(DUMMY_TARGET); + assertThat(provider.get(DUMMY_TARGET)).isNull(); } @Test @@ -75,12 +76,12 @@ public class SharedXdsClientPoolProviderTest { when(bootstrapper.bootstrap()).thenReturn(bootstrapInfo); SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(bootstrapper); - assertThat(provider.get()).isNull(); - ObjectPool xdsClientPool = provider.getOrCreate(); + assertThat(provider.get(DUMMY_TARGET)).isNull(); + ObjectPool xdsClientPool = provider.getOrCreate(DUMMY_TARGET); verify(bootstrapper).bootstrap(); - assertThat(provider.getOrCreate()).isSameInstanceAs(xdsClientPool); - assertThat(provider.get()).isNotNull(); - assertThat(provider.get()).isSameInstanceAs(xdsClientPool); + assertThat(provider.getOrCreate(DUMMY_TARGET)).isSameInstanceAs(xdsClientPool); + assertThat(provider.get(DUMMY_TARGET)).isNotNull(); + assertThat(provider.get(DUMMY_TARGET)).isSameInstanceAs(xdsClientPool); verifyNoMoreInteractions(bootstrapper); } @@ -90,7 +91,7 @@ public class SharedXdsClientPoolProviderTest { BootstrapInfo bootstrapInfo = BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build(); RefCountedXdsClientObjectPool xdsClientPool = - new RefCountedXdsClientObjectPool(bootstrapInfo); + new RefCountedXdsClientObjectPool(bootstrapInfo, DUMMY_TARGET); assertThat(xdsClientPool.getXdsClientForTest()).isNull(); XdsClient xdsClient = xdsClientPool.getObject(); assertThat(xdsClientPool.getXdsClientForTest()).isNotNull(); @@ -103,7 +104,7 @@ public class SharedXdsClientPoolProviderTest { BootstrapInfo bootstrapInfo = BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build(); RefCountedXdsClientObjectPool xdsClientPool = - new RefCountedXdsClientObjectPool(bootstrapInfo); + new RefCountedXdsClientObjectPool(bootstrapInfo, DUMMY_TARGET); // getObject once XdsClient xdsClient = xdsClientPool.getObject(); assertThat(xdsClient).isNotNull(); @@ -123,7 +124,7 @@ public class SharedXdsClientPoolProviderTest { BootstrapInfo bootstrapInfo = BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build(); RefCountedXdsClientObjectPool xdsClientPool = - new RefCountedXdsClientObjectPool(bootstrapInfo); + new RefCountedXdsClientObjectPool(bootstrapInfo, DUMMY_TARGET); XdsClient xdsClient1 = xdsClientPool.getObject(); assertThat(xdsClientPool.returnObject(xdsClient1)).isNull(); assertThat(xdsClient1.isShutDown()).isTrue(); diff --git a/xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java b/xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java index 149c1d6170..0b8e89de72 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java @@ -72,12 +72,13 @@ public class XdsClientFederationTest { private ObjectPool xdsClientPool; private XdsClient xdsClient; + private static final String DUMMY_TARGET = "dummy"; @Before public void setUp() throws XdsInitializationException { SharedXdsClientPoolProvider clientPoolProvider = new SharedXdsClientPoolProvider(); clientPoolProvider.setBootstrapOverride(defaultBootstrapOverride()); - xdsClientPool = clientPoolProvider.getOrCreate(); + xdsClientPool = clientPoolProvider.getOrCreate(DUMMY_TARGET); xdsClient = xdsClientPool.getObject(); } diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index 28871850e7..24c2a43b83 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -95,11 +95,15 @@ import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsInitializationException; import io.grpc.xds.client.XdsResourceType; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -166,15 +170,22 @@ public class XdsNameResolverTest { private XdsNameResolver resolver; private TestCall testCall; private boolean originalEnableTimeout; + private URI targetUri; @Before public void setUp() { + try { + targetUri = new URI(AUTHORITY); + } catch (URISyntaxException e) { + targetUri = null; + } + originalEnableTimeout = XdsNameResolver.enableTimeout; XdsNameResolver.enableTimeout = true; FilterRegistry filterRegistry = FilterRegistry.newRegistry().register( new FaultFilter(mockRandom, new AtomicLong()), RouterFilter.INSTANCE); - resolver = new XdsNameResolver(null, AUTHORITY, null, + resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, filterRegistry, null); } @@ -199,16 +210,22 @@ public class XdsNameResolverTest { @Override @Nullable - public ObjectPool get() { + public ObjectPool get(String target) { throw new UnsupportedOperationException("Should not be called"); } @Override - public ObjectPool getOrCreate() throws XdsInitializationException { + public ObjectPool getOrCreate(String target) throws XdsInitializationException { throw new XdsInitializationException("Fail to read bootstrap file"); } + + @Override + public List getTargets() { + return null; + } }; - resolver = new XdsNameResolver(null, AUTHORITY, null, + + resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); resolver.start(mockListener); @@ -221,7 +238,7 @@ public class XdsNameResolverTest { @Test public void resolving_withTargetAuthorityNotFound() { - resolver = new XdsNameResolver( + resolver = new XdsNameResolver(targetUri, "notfound.google.com", AUTHORITY, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); resolver.start(mockListener); @@ -243,7 +260,7 @@ public class XdsNameResolverTest { String serviceAuthority = "[::FFFF:129.144.52.38]:80"; expectedLdsResourceName = "[::FFFF:129.144.52.38]:80/id=1"; resolver = new XdsNameResolver( - null, serviceAuthority, null, serviceConfigParser, syncContext, + targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); resolver.start(mockListener); @@ -264,7 +281,7 @@ public class XdsNameResolverTest { "xdstp://xds.authority.com/envoy.config.listener.v3.Listener/" + "%5B::FFFF:129.144.52.38%5D:80?id=1"; resolver = new XdsNameResolver( - null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler, + targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); resolver.start(mockListener); verify(mockListener, never()).onError(any(Status.class)); @@ -284,7 +301,7 @@ public class XdsNameResolverTest { "xdstp://xds.authority.com/envoy.config.listener.v3.Listener/" + "path/to/service?id=1"; resolver = new XdsNameResolver( - null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler, + targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); @@ -311,7 +328,7 @@ public class XdsNameResolverTest { .build(); expectedLdsResourceName = "xdstp://xds.authority.com/envoy.config.listener.v3.Listener/" + "%5B::FFFF:129.144.52.38%5D:80?bar=2&foo=1"; // query param canonified - resolver = new XdsNameResolver( + resolver = new XdsNameResolver(targetUri, "xds.authority.com", serviceAuthority, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); resolver.start(mockListener); @@ -343,7 +360,7 @@ public class XdsNameResolverTest { .clientDefaultListenerResourceNameTemplate("test-%s") .node(Node.newBuilder().build()) .build(); - resolver = new XdsNameResolver(null, AUTHORITY, null, + resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); // use different ldsResourceName and service authority. The virtualhost lookup should use @@ -524,7 +541,7 @@ public class XdsNameResolverTest { Collections.singletonList(route), ImmutableMap.of()); - resolver = new XdsNameResolver(null, AUTHORITY, "random", + resolver = new XdsNameResolver(targetUri, null, AUTHORITY, "random", serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); resolver.start(mockListener); @@ -547,7 +564,7 @@ public class XdsNameResolverTest { Collections.singletonList(route), ImmutableMap.of()); - resolver = new XdsNameResolver(null, AUTHORITY, "random", + resolver = new XdsNameResolver(targetUri, null, AUTHORITY, "random", serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); resolver.start(mockListener); @@ -558,7 +575,7 @@ public class XdsNameResolverTest { @Test public void resolving_matchingVirtualHostNotFoundForOverrideAuthority() { - resolver = new XdsNameResolver(null, AUTHORITY, AUTHORITY, + resolver = new XdsNameResolver(targetUri, null, AUTHORITY, AUTHORITY, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); resolver.start(mockListener); @@ -643,8 +660,8 @@ public class XdsNameResolverTest { public void retryPolicyInPerMethodConfigGeneratedByResolverIsValid() { ServiceConfigParser realParser = new ScParser( true, 5, 5, new AutoConfiguredLoadBalancerFactory("pick-first")); - resolver = new XdsNameResolver(null, AUTHORITY, null, realParser, syncContext, scheduler, - xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); + resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, realParser, syncContext, + scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); RetryPolicy retryPolicy = RetryPolicy.create( @@ -847,7 +864,7 @@ public class XdsNameResolverTest { resolver.shutdown(); reset(mockListener); when(mockRandom.nextLong()).thenReturn(123L); - resolver = new XdsNameResolver(null, AUTHORITY, null, serviceConfigParser, + resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); resolver.start(mockListener); @@ -1896,17 +1913,20 @@ public class XdsNameResolverTest { } private final class FakeXdsClientPoolFactory implements XdsClientPoolFactory { + Set targets = new HashSet<>(); + @Override public void setBootstrapOverride(Map bootstrap) {} @Override @Nullable - public ObjectPool get() { + public ObjectPool get(String target) { throw new UnsupportedOperationException("Should not be called"); } @Override - public ObjectPool getOrCreate() throws XdsInitializationException { + public ObjectPool getOrCreate(String target) throws XdsInitializationException { + targets.add(target); return new ObjectPool() { @Override public XdsClient getObject() { @@ -1919,6 +1939,16 @@ public class XdsNameResolverTest { } }; } + + @Override + public List getTargets() { + if (targets.isEmpty()) { + List targetList = new ArrayList<>(); + targetList.add(targetUri.toString()); + return targetList; + } + return new ArrayList<>(targets); + } } private class FakeXdsClient extends XdsClient { diff --git a/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java b/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java index 5d59e97335..791318c535 100644 --- a/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java +++ b/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java @@ -146,12 +146,12 @@ public class XdsServerTestHelper { @Override @Nullable - public ObjectPool get() { + public ObjectPool get(String target) { throw new UnsupportedOperationException("Should not be called"); } @Override - public ObjectPool getOrCreate() throws XdsInitializationException { + public ObjectPool getOrCreate(String target) throws XdsInitializationException { return new ObjectPool() { @Override public XdsClient getObject() { @@ -165,6 +165,11 @@ public class XdsServerTestHelper { } }; } + + @Override + public List getTargets() { + return Collections.singletonList("fake-target"); + } } static final class FakeXdsClient extends XdsClient { diff --git a/xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java b/xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java index c51327dc84..cc12e3863b 100644 --- a/xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java +++ b/xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java @@ -135,12 +135,15 @@ final class XdsTestControlPlaneService extends new Object[]{value.getResourceNamesList(), value.getErrorDetail()}); return; } + String resourceType = value.getTypeUrl(); - if (!value.getResponseNonce().isEmpty() - && !String.valueOf(xdsNonces.get(resourceType)).equals(value.getResponseNonce())) { + if (!value.getResponseNonce().isEmpty() && xdsNonces.containsKey(resourceType) + && !String.valueOf(xdsNonces.get(resourceType).get(responseObserver)) + .equals(value.getResponseNonce())) { logger.log(Level.FINE, "Resource nonce does not match, ignore."); return; } + Set requestedResourceNames = new HashSet<>(value.getResourceNamesList()); if (subscribers.get(resourceType).containsKey(responseObserver) && subscribers.get(resourceType).get(responseObserver) @@ -149,9 +152,11 @@ final class XdsTestControlPlaneService extends value.getResourceNamesList()); return; } + if (!xdsNonces.get(resourceType).containsKey(responseObserver)) { xdsNonces.get(resourceType).put(responseObserver, new AtomicInteger(0)); } + DiscoveryResponse response = generateResponse(resourceType, String.valueOf(xdsVersions.get(resourceType)), String.valueOf(xdsNonces.get(resourceType).get(responseObserver)),