diff --git a/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java b/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java index 9d5e2605ff..21bfe93732 100644 --- a/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java +++ b/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java @@ -159,6 +159,7 @@ final class EnvoyServerProtoData { * Corresponds to Envoy proto message {@link io.envoyproxy.envoy.api.v2.listener.FilterChain}. */ static final class FilterChain { + // TODO(sanjaypujare): flatten structure by moving FilterChainMatch class members here. private final FilterChainMatch filterChainMatch; // TODO(sanjaypujare): remove dependency on envoy data type along with rest of the code. private final io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext downstreamTlsContext; diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index f230d1d2e5..14d3a1afd5 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -62,22 +62,15 @@ abstract class XdsClient { */ static final class ConfigUpdate { private final String clusterName; - private final Listener listener; - private ConfigUpdate(String clusterName, @Nullable Listener listener) { + private ConfigUpdate(String clusterName) { this.clusterName = clusterName; - this.listener = listener; } String getClusterName() { return clusterName; } - @Nullable - public Listener getListener() { - return listener; - } - @Override public String toString() { return @@ -93,7 +86,6 @@ abstract class XdsClient { static final class Builder { private String clusterName; - @Nullable private Listener listener; // Use ConfigUpdate.newBuilder(). private Builder() { @@ -104,14 +96,9 @@ abstract class XdsClient { return this; } - Builder setListener(Listener listener) { - this.listener = listener; - return this; - } - ConfigUpdate build() { Preconditions.checkState(clusterName != null, "clusterName is not set"); - return new ConfigUpdate(clusterName, listener); + return new ConfigUpdate(clusterName); } } } @@ -352,6 +339,52 @@ abstract class XdsClient { } } + /** + * Updates via resource discovery RPCs using LDS. Includes {@link Listener} object containing + * config for security, RBAC or other server side features such as rate limit. + */ + static final class ListenerUpdate { + // TODO(sanjaypujare): flatten structure by moving Listener class members here. + private final Listener listener; + + private ListenerUpdate(Listener listener) { + this.listener = listener; + } + + public Listener getListener() { + return listener; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("listener", listener) + .toString(); + } + + static Builder newBuilder() { + return new Builder(); + } + + static final class Builder { + private Listener listener; + + // Use ListenerUpdate.newBuilder(). + private Builder() { + } + + Builder setListener(Listener listener) { + this.listener = listener; + return this; + } + + ListenerUpdate build() { + Preconditions.checkState(listener != null, "listener is not set"); + return new ListenerUpdate(listener); + } + } + } + /** * Config watcher interface. To be implemented by the xDS resolver. */ @@ -385,6 +418,19 @@ abstract class XdsClient { void onError(Status error); } + /** + * Listener watcher interface. To be used by {@link io.grpc.xds.internal.sds.XdsServerBuilder}. + */ + interface ListenerWatcher { + + /** + * Called when receiving an update on Listener configuration. + */ + void onListenerChanged(ListenerUpdate update); + + void onError(Status error); + } + /** * Shutdown this {@link XdsClient} and release resources. */ @@ -430,6 +476,12 @@ abstract class XdsClient { void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) { } + /** + * Registers a watcher for a Listener with the given port. + */ + void watchListenerData(int port, ListenerWatcher watcher) { + } + /** * Report client load stats to a remote server for the given cluster:cluster_service. * diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index 2f55a7e890..bac3cd3c2b 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -26,6 +26,8 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageOrBuilder; +import com.google.protobuf.Struct; +import com.google.protobuf.Value; import com.google.protobuf.util.JsonFormat; import com.google.rpc.Code; import io.envoyproxy.envoy.api.v2.Cluster; @@ -37,7 +39,10 @@ import io.envoyproxy.envoy.api.v2.DiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryResponse; import io.envoyproxy.envoy.api.v2.Listener; import io.envoyproxy.envoy.api.v2.RouteConfiguration; +import io.envoyproxy.envoy.api.v2.core.Address; import io.envoyproxy.envoy.api.v2.core.Node; +import io.envoyproxy.envoy.api.v2.listener.FilterChain; +import io.envoyproxy.envoy.api.v2.listener.FilterChainMatch; import io.envoyproxy.envoy.api.v2.route.Route; import io.envoyproxy.envoy.api.v2.route.VirtualHost; import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager; @@ -100,7 +105,7 @@ final class XdsClientImpl extends XdsClient { // The node identifier to be included in xDS requests. Management server only requires the // first request to carry the node identifier on a stream. It should be identical if present // more than once. - private final Node node; + private Node node; // Cached data for CDS responses, keyed by cluster names. // Optimization: cache ClusterUpdate, which contains only information needed by gRPC, instead @@ -154,13 +159,18 @@ final class XdsClientImpl extends XdsClient { private LoadReportClient lrsClient; // Following fields are set only after the ConfigWatcher registered. Once set, they should - // never change. + // never change. Only a ConfigWatcher or ListenerWatcher can be registered. @Nullable private ConfigWatcher configWatcher; // The "xds:" URI (including port suffix if present) that the gRPC client targets for. @Nullable private String ldsResourceName; + // only a ConfigWatcher or ListenerWatcher can be registered. + @Nullable + private ListenerWatcher listenerWatcher; + private int listenerPort = -1; + XdsClientImpl( String targetName, List servers, // list of management servers @@ -232,6 +242,7 @@ final class XdsClientImpl extends XdsClient { @Override void watchConfigData(String targetAuthority, ConfigWatcher watcher) { checkState(configWatcher == null, "watcher for %s already registered", targetAuthority); + checkState(listenerWatcher == null, "ListenerWatcher already registered"); ldsResourceName = checkNotNull(targetAuthority, "targetAuthority"); configWatcher = checkNotNull(watcher, "watcher"); logger.log(XdsLogLevel.INFO, "Started watching config {0}", ldsResourceName); @@ -406,6 +417,41 @@ final class XdsClientImpl extends XdsClient { } } + @Override + void watchListenerData(int port, ListenerWatcher watcher) { + checkState(configWatcher == null, + "ListenerWatcher cannot be set when ConfigWatcher set"); + checkState(listenerWatcher == null, "ListenerWatcher already registered"); + listenerWatcher = checkNotNull(watcher, "watcher"); + checkArgument(port > 0, "port needs to be > 0"); + this.listenerPort = port; + logger.log(XdsLogLevel.INFO, "Started watching listener for port {0}", port); + if (rpcRetryTimer != null && rpcRetryTimer.isPending()) { + // Currently in retry backoff. + return; + } + if (adsStream == null) { + startRpcStream(); + } + updateNodeMetadataForListenerRequest(port); + adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.of()); + ldsRespTimer = + syncContext + .schedule( + new ListenerResourceFetchTimeoutTask(":" + port), + INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService); + } + + /** In case of Listener watcher metadata to be updated to include port. */ + private void updateNodeMetadataForListenerRequest(int port) { + // TODO(sanjaypujare): fields of metadata to update to be finalized + Struct newMetadata = node.getMetadata().toBuilder() + .putFields("listener_inbound_port", + Value.newBuilder().setStringValue("" + port).build()) + .build(); + node = node.toBuilder().setMetadata(newMetadata).build(); + } + @Override void reportClientStats( String clusterName, @Nullable String clusterServiceName, LoadStatsStore loadStatsStore) { @@ -463,6 +509,24 @@ final class XdsClientImpl extends XdsClient { adsStreamRetryStopwatch.reset().start(); } + /** + * Calls handleLdsResponseForListener or handleLdsResponseForConfigUpdate based on which watcher + * was set. + */ + private void handleLdsResponse(DiscoveryResponse ldsResponse) { + checkState((configWatcher != null) != (listenerWatcher != null), + "No LDS request was ever sent. Management server is doing something wrong"); + if (logger.isLoggable(XdsLogLevel.DEBUG)) { + logger.log( + XdsLogLevel.DEBUG, "Received LDS response:\n{0}", respPrinter.print(ldsResponse)); + } + if (listenerWatcher != null) { + handleLdsResponseForListener(ldsResponse); + } else { + handleLdsResponseForConfigUpdate(ldsResponse); + } + } + /** * Handles LDS response to find the HttpConnectionManager message for the requested resource name. * Proceed with the resolved RouteConfiguration in HttpConnectionManager message of the requested @@ -471,13 +535,9 @@ final class XdsClientImpl extends XdsClient { * resolution. The response is NACKed if contains invalid data for gRPC's usage. Otherwise, an * ACK request is sent to management server. */ - private void handleLdsResponse(DiscoveryResponse ldsResponse) { + private void handleLdsResponseForConfigUpdate(DiscoveryResponse ldsResponse) { checkState(ldsResourceName != null && configWatcher != null, - "No LDS request was ever sent. Management server is doing something wrong"); - if (logger.isLoggable(XdsLogLevel.DEBUG)) { - logger.log( - XdsLogLevel.DEBUG, "Received LDS response:\n{0}", respPrinter.print(ldsResponse)); - } + "LDS request for ConfigWatcher was never sent!"); // Unpack Listener messages. List listeners = new ArrayList<>(ldsResponse.getResourcesCount()); @@ -605,6 +665,72 @@ final class XdsClientImpl extends XdsClient { } } + private void handleLdsResponseForListener(DiscoveryResponse ldsResponse) { + checkState(ldsResourceName == null && listenerPort > 0 && listenerWatcher != null, + "LDS request for ListenerWatcher was never sent!"); + + // Unpack Listener messages. + Listener requestedListener = null; + logger.log(XdsLogLevel.DEBUG, "Listener count: {0}", ldsResponse.getResourcesCount()); + try { + for (com.google.protobuf.Any res : ldsResponse.getResourcesList()) { + Listener listener = res.unpack(Listener.class); + logger.log(XdsLogLevel.DEBUG, "Found listener {0}", listener.toString()); + if (isRequestedListener(listener)) { + requestedListener = listener; + logger.log(XdsLogLevel.DEBUG, "Requested listener found: {0}", listener.getName()); + } + } + } catch (InvalidProtocolBufferException e) { + logger.log(XdsLogLevel.WARNING, "Failed to unpack Listeners in LDS response {0}", e); + adsStream.sendNackRequest( + ADS_TYPE_URL_LDS, ImmutableList.of(), + ldsResponse.getVersionInfo(), "Malformed LDS response: " + e); + return; + } + adsStream.sendAckRequest(ADS_TYPE_URL_LDS, ImmutableList.of(), + ldsResponse.getVersionInfo()); + if (requestedListener != null) { + if (ldsRespTimer != null) { + ldsRespTimer.cancel(); + ldsRespTimer = null; + } + ListenerUpdate listenerUpdate = ListenerUpdate.newBuilder() + .setListener(EnvoyServerProtoData.Listener.fromEnvoyProtoListener(requestedListener)) + .build(); + listenerWatcher.onListenerChanged(listenerUpdate); + } else { + if (ldsRespTimer == null) { + listenerWatcher.onError(Status.NOT_FOUND.withDescription("did not find listener for " + + listenerPort)); + } + } + } + + private boolean isRequestedListener(Listener listener) { + // TODO(sanjaypujare): check listener.getName() once we know what xDS server returns + return isAddressMatching(listener.getAddress()) + && hasMatchingFilter(listener.getFilterChainsList()); + } + + private boolean isAddressMatching(Address address) { + // TODO(sanjaypujare): check IP address once we know xDS server will include it + return address.hasSocketAddress() + && (address.getSocketAddress().getPortValue() == listenerPort); + } + + private boolean hasMatchingFilter(List filterChainsList) { + // TODO(sanjaypujare): if myIp to be checked against filterChainMatch.getPrefixRangesList() + for (FilterChain filterChain : filterChainsList) { + FilterChainMatch filterChainMatch = filterChain.getFilterChainMatch(); + + if (listenerPort == filterChainMatch.getDestinationPort().getValue()) { + return true; + } + } + return false; + } + /** * Handles RDS response to find the RouteConfiguration message for the requested resource name. * Proceed with the resolved RouteConfiguration if exists to find the VirtualHost configuration @@ -1028,6 +1154,14 @@ final class XdsClientImpl extends XdsClient { new LdsResourceFetchTimeoutTask(ldsResourceName), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService); } + if (listenerWatcher != null) { + adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.of()); + ldsRespTimer = + syncContext + .schedule( + new ListenerResourceFetchTimeoutTask(":" + listenerPort), + INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService); + } if (!clusterWatchers.isEmpty()) { adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet()); for (String clusterName : clusterWatchers.keySet()) { @@ -1164,6 +1298,9 @@ final class XdsClientImpl extends XdsClient { if (configWatcher != null) { configWatcher.onError(error); } + if (listenerWatcher != null) { + listenerWatcher.onError(error); + } for (Set watchers : clusterWatchers.values()) { for (ClusterWatcher watcher : watchers) { watcher.onError(error); @@ -1371,6 +1508,23 @@ final class XdsClientImpl extends XdsClient { } } + @VisibleForTesting + final class ListenerResourceFetchTimeoutTask extends ResourceFetchTimeoutTask { + + ListenerResourceFetchTimeoutTask(String resourceName) { + super(resourceName); + } + + @Override + public void run() { + super.run(); + ldsRespTimer = null; + listenerWatcher.onError( + Status.NOT_FOUND + .withDescription("Listener resource for port " + resourceName + " not found.")); + } + } + @VisibleForTesting final class RdsResourceFetchTimeoutTask extends ResourceFetchTimeoutTask { diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestForListener.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestForListener.java new file mode 100644 index 0000000000..77c0d0ba66 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestForListener.java @@ -0,0 +1,814 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryResponse; +import static io.grpc.xds.XdsClientTestHelper.buildListener; +import static io.grpc.xds.XdsClientTestHelper.buildRouteConfiguration; +import static io.grpc.xds.XdsClientTestHelper.buildVirtualHost; +import static org.junit.Assert.fail; +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Any; +import com.google.protobuf.Struct; +import com.google.protobuf.UInt32Value; +import com.google.protobuf.Value; +import io.envoyproxy.envoy.api.v2.DiscoveryRequest; +import io.envoyproxy.envoy.api.v2.DiscoveryResponse; +import io.envoyproxy.envoy.api.v2.Listener; +import io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext; +import io.envoyproxy.envoy.api.v2.core.Address; +import io.envoyproxy.envoy.api.v2.core.CidrRange; +import io.envoyproxy.envoy.api.v2.core.Node; +import io.envoyproxy.envoy.api.v2.core.SocketAddress; +import io.envoyproxy.envoy.api.v2.listener.Filter; +import io.envoyproxy.envoy.api.v2.listener.FilterChain; +import io.envoyproxy.envoy.api.v2.listener.FilterChainMatch; +import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager; +import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; +import io.grpc.Context; +import io.grpc.Context.CancellationListener; +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.SynchronizationContext; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.BackoffPolicy; +import io.grpc.internal.FakeClock; +import io.grpc.internal.FakeClock.TaskFilter; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.xds.Bootstrapper.ChannelCreds; +import io.grpc.xds.Bootstrapper.ServerInfo; +import io.grpc.xds.XdsClient.ConfigWatcher; +import io.grpc.xds.XdsClient.ListenerUpdate; +import io.grpc.xds.XdsClient.ListenerWatcher; +import io.grpc.xds.XdsClient.XdsChannelFactory; +import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link XdsClientImpl for server side Listeners}. + */ +@RunWith(JUnit4.class) +public class XdsClientImplTestForListener { + + private static final int PORT = 7000; + private static final String LOCAL_IP = "192.168.3.5"; + private static final String DIFFERENT_IP = "192.168.3.6"; + private static final String TYPE_URL_HCM = + "type.googleapis.com/" + + "envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager"; + + private static final Node NODE = Node.getDefaultInstance(); + private static final FakeClock.TaskFilter RPC_RETRY_TASK_FILTER = + new FakeClock.TaskFilter() { + @Override + public boolean shouldAccept(Runnable command) { + return command.toString().contains(XdsClientImpl.RpcRetryTask.class.getSimpleName()); + } + }; + private static final TaskFilter LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER = + new TaskFilter() { + @Override + public boolean shouldAccept(Runnable command) { + return command.toString() + .contains(XdsClientImpl.ListenerResourceFetchTimeoutTask.class.getSimpleName()); + } + }; + private static final String LISTENER_NAME = "INBOUND_LISTENER"; + + @Rule + public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); + private final FakeClock fakeClock = new FakeClock(); + + private final Queue> responseObservers = new ArrayDeque<>(); + private final Queue> requestObservers = new ArrayDeque<>(); + private final AtomicBoolean callEnded = new AtomicBoolean(true); + + @Mock + private AggregatedDiscoveryServiceImplBase mockedDiscoveryService; + @Mock + private BackoffPolicy.Provider backoffPolicyProvider; + @Mock + private BackoffPolicy backoffPolicy1; + @Mock + private BackoffPolicy backoffPolicy2; + @Mock + private ConfigWatcher configWatcher; + @Mock + private ListenerWatcher listenerWatcher; + + private ManagedChannel channel; + private XdsClientImpl xdsClient; + + @Before + public void setUp() throws IOException { + MockitoAnnotations.initMocks(this); + when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); + when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L); + when(backoffPolicy2.nextBackoffNanos()).thenReturn(20L, 200L); + + final String serverName = InProcessServerBuilder.generateName(); + AggregatedDiscoveryServiceImplBase adsServiceImpl = new AggregatedDiscoveryServiceImplBase() { + @Override + public StreamObserver streamAggregatedResources( + final StreamObserver responseObserver) { + assertThat(callEnded.get()).isTrue(); // ensure previous call was ended + callEnded.set(false); + Context.current().addListener( + new CancellationListener() { + @Override + public void cancelled(Context context) { + callEnded.set(true); + } + }, MoreExecutors.directExecutor()); + responseObservers.offer(responseObserver); + @SuppressWarnings("unchecked") + StreamObserver requestObserver = mock(StreamObserver.class); + requestObservers.offer(requestObserver); + return requestObserver; + } + }; + mockedDiscoveryService = + mock(AggregatedDiscoveryServiceImplBase.class, delegatesTo(adsServiceImpl)); + + cleanupRule.register( + InProcessServerBuilder + .forName(serverName) + .addService(mockedDiscoveryService) + .directExecutor() + .build() + .start()); + channel = + cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); + + List servers = + ImmutableList.of(new ServerInfo(serverName, ImmutableList.of())); + XdsChannelFactory channelFactory = new XdsChannelFactory() { + @Override + ManagedChannel createChannel(List servers) { + assertThat(Iterables.getOnlyElement(servers).getServerUri()).isEqualTo(serverName); + assertThat(Iterables.getOnlyElement(servers).getChannelCredentials()).isEmpty(); + return channel; + } + }; + + xdsClient = + new XdsClientImpl("", servers, channelFactory, NODE, syncContext, + fakeClock.getScheduledExecutorService(), backoffPolicyProvider, + fakeClock.getStopwatchSupplier()); + // Only the connection to management server is established, no RPC request is sent until at + // least one watcher is registered. + assertThat(responseObservers).isEmpty(); + assertThat(requestObservers).isEmpty(); + } + + @After + public void tearDown() { + xdsClient.shutdown(); + assertThat(callEnded.get()).isTrue(); + assertThat(channel.isShutdown()).isTrue(); + assertThat(fakeClock.getPendingTasks()).isEmpty(); + } + + private Node getNodeToVerify() { + Struct newMetadata = NODE.getMetadata().toBuilder() + .putFields("listener_inbound_port", + Value.newBuilder().setStringValue("" + PORT).build()) + .build(); + return NODE.toBuilder().setMetadata(newMetadata).build(); + } + + private static DiscoveryRequest buildDiscoveryRequest( + Node node, String versionInfo, String typeUrl, String nonce) { + return DiscoveryRequest.newBuilder() + .setVersionInfo(versionInfo) + .setNode(node) + .setTypeUrl(typeUrl) + .setResponseNonce(nonce) + .build(); + } + + /** Error when ConfigWatcher and then ListenerWatcher registered. */ + @Test + public void ldsResponse_configAndListenerWatcher_expectError() { + xdsClient.watchConfigData("somehost:80", configWatcher); + try { + xdsClient.watchListenerData(PORT, listenerWatcher); + fail("expected exception"); + } catch (IllegalStateException expected) { + assertThat(expected) + .hasMessageThat() + .isEqualTo("ListenerWatcher cannot be set when ConfigWatcher set"); + } + } + + /** Error when ListenerWatcher and then ConfigWatcher registered. */ + @Test + public void ldsResponse_listenerAndConfigWatcher_expectError() { + xdsClient.watchListenerData(PORT, listenerWatcher); + try { + xdsClient.watchConfigData("somehost:80", configWatcher); + fail("expected exception"); + } catch (IllegalStateException expected) { + assertThat(expected) + .hasMessageThat() + .isEqualTo("ListenerWatcher already registered"); + } + } + + /** Error when 2 ListenerWatchers registered. */ + @Test + public void ldsResponse_2listenerWatchers_expectError() { + xdsClient.watchListenerData(PORT, listenerWatcher); + try { + xdsClient.watchListenerData(80, listenerWatcher); + fail("expected exception"); + } catch (IllegalStateException expected) { + assertThat(expected) + .hasMessageThat() + .isEqualTo("ListenerWatcher already registered"); + } + } + + /** + * Client receives an LDS response that contains listener with no match i.e. no port match. + */ + @Test + public void ldsResponse_nonMatchingFilterChain_notFoundError() { + xdsClient.watchListenerData(PORT, listenerWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an LDS request with null in lds resource name + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); + + List listeners = ImmutableList.of( + Any.pack(buildListener("bar.googleapis.com", + Any.pack(HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration("route-bar.googleapis.com", + ImmutableList.of( + buildVirtualHost( + ImmutableList.of("bar.googleapis.com"), + "cluster-bar.googleapis.com")))) + .build()))), + Any.pack(buildListener(LISTENER_NAME, + Any.pack(HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration("route-baz.googleapis.com", + ImmutableList.of( + buildVirtualHost( + ImmutableList.of("baz.googleapis.com"), + "cluster-baz.googleapis.com")))) + .build())))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0", + XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + + verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class)); + verify(listenerWatcher, never()).onError(any(Status.class)); + fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + ArgumentCaptor errorStatusCaptor = ArgumentCaptor.forClass(null); + verify(listenerWatcher).onError(errorStatusCaptor.capture()); + Status error = errorStatusCaptor.getValue(); + assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND); + assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + } + + /** Client receives a Listener with listener address and mismatched port. */ + @Test + public void ldsResponseWith_listenerAddressPortMismatch() { + xdsClient.watchListenerData(PORT, listenerWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an LDS request with null in lds resource name + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); + + final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null); + final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT, + CidrRange.newBuilder().setAddressPrefix(LOCAL_IP) + .setPrefixLen(UInt32Value.of(32)).build()), + CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default", + "ROOTCA"), + buildTestFilter("envoy.http_connection_manager")); + List listeners = ImmutableList.of( + Any.pack(buildListener("bar.googleapis.com", + Any.pack(HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration("route-bar.googleapis.com", + ImmutableList.of( + buildVirtualHost( + ImmutableList.of("bar.googleapis.com"), + "cluster-bar.googleapis.com")))) + .build()))), + Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0", + filterChainOutbound, + filterChainInbound + ))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0", + XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + + verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class)); + verify(listenerWatcher, never()).onError(any(Status.class)); + fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + ArgumentCaptor errorStatusCaptor = ArgumentCaptor.forClass(null); + verify(listenerWatcher).onError(errorStatusCaptor.capture()); + Status error = errorStatusCaptor.getValue(); + assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND); + assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + } + + /** Client receives a Listener with all match. */ + @Test + public void ldsResponseWith_matchingListenerFound() { + xdsClient.watchListenerData(PORT, listenerWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an LDS request with null in lds resource name + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); + + final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null); + final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT, + CidrRange.newBuilder().setAddressPrefix(LOCAL_IP) + .setPrefixLen(UInt32Value.of(32)).build()), + CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default", + "ROOTCA"), + buildTestFilter("envoy.http_connection_manager")); + List listeners = ImmutableList.of( + Any.pack(buildListener("bar.googleapis.com", + Any.pack(HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration("route-bar.googleapis.com", + ImmutableList.of( + buildVirtualHost( + ImmutableList.of("bar.googleapis.com"), + "cluster-bar.googleapis.com")))) + .build()))), + Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0", + filterChainOutbound, + filterChainInbound + ))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0", + XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + + verify(listenerWatcher, never()).onError(any(Status.class)); + ArgumentCaptor listenerUpdateCaptor = ArgumentCaptor.forClass(null); + verify(listenerWatcher, times(1)).onListenerChanged(listenerUpdateCaptor.capture()); + ListenerUpdate configUpdate = listenerUpdateCaptor.getValue(); + EnvoyServerProtoData.Listener listener = configUpdate.getListener(); + assertThat(listener.getName()).isEqualTo(LISTENER_NAME); + assertThat(listener.getAddress()).isEqualTo("0.0.0.0:" + PORT); + EnvoyServerProtoData.FilterChain[] expected = new EnvoyServerProtoData.FilterChain[]{ + EnvoyServerProtoData.FilterChain.fromEnvoyProtoFilterChain(filterChainOutbound), + EnvoyServerProtoData.FilterChain.fromEnvoyProtoFilterChain(filterChainInbound) + }; + assertThat(listener.getFilterChains()).isEqualTo(Arrays.asList(expected)); + assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + } + + /** Client receives LDS responses for updating Listener previously received. */ + @Test + public void notifyUpdatedListener() { + xdsClient.watchListenerData(PORT, listenerWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an LDS request with null in lds resource name + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); + + final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null); + final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT, + CidrRange.newBuilder().setAddressPrefix(LOCAL_IP) + .setPrefixLen(UInt32Value.of(32)).build()), + CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default", + "ROOTCA"), + buildTestFilter("envoy.http_connection_manager")); + List listeners = ImmutableList.of( + Any.pack(buildListener("bar.googleapis.com", + Any.pack(HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration("route-bar.googleapis.com", + ImmutableList.of( + buildVirtualHost( + ImmutableList.of("bar.googleapis.com"), + "cluster-bar.googleapis.com")))) + .build()))), + Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0", + filterChainOutbound, + filterChainInbound + ))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0", + XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + + verify(listenerWatcher, never()).onError(any(Status.class)); + ArgumentCaptor listenerUpdateCaptor = ArgumentCaptor.forClass(null); + verify(listenerWatcher, times(1)).onListenerChanged(listenerUpdateCaptor.capture()); + + // Management sends back another LDS response containing updates for the requested Listener. + final FilterChain filterChainNewInbound = buildFilterChain(buildFilterChainMatch(PORT, + CidrRange.newBuilder().setAddressPrefix(LOCAL_IP) + .setPrefixLen(UInt32Value.of(32)).build()), + CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default1", + "ROOTCA2"), + buildTestFilter("envoy.http_connection_manager")); + List listeners1 = ImmutableList.of( + Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0", + filterChainNewInbound + ))); + DiscoveryResponse response1 = + buildDiscoveryResponse("1", listeners1, XdsClientImpl.ADS_TYPE_URL_LDS, "0001"); + responseObserver.onNext(response1); + + // Client sends an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1", + XdsClientImpl.ADS_TYPE_URL_LDS, "0001"))); + + // Updated listener is notified to config watcher. + listenerUpdateCaptor = ArgumentCaptor.forClass(null); + verify(listenerWatcher, times(2)).onListenerChanged(listenerUpdateCaptor.capture()); + ListenerUpdate configUpdate = listenerUpdateCaptor.getValue(); + EnvoyServerProtoData.Listener listener = configUpdate.getListener(); + assertThat(listener.getName()).isEqualTo(LISTENER_NAME); + EnvoyServerProtoData.FilterChain[] expected = new EnvoyServerProtoData.FilterChain[]{ + EnvoyServerProtoData.FilterChain.fromEnvoyProtoFilterChain(filterChainNewInbound) + }; + assertThat(listener.getFilterChains()).isEqualTo(Arrays.asList(expected)); + } + + /** + * Client receives LDS response containing matching name but non-matching IP address. Test + * disabled until IP matching logic implemented. + */ + @Ignore + @Test + public void ldsResponse_nonMatchingIpAddress() { + xdsClient.watchListenerData(PORT, listenerWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an LDS request with null in lds resource name + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); + + final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(8000), null); + final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(PORT, + CidrRange.newBuilder().setAddressPrefix(DIFFERENT_IP) + .setPrefixLen(UInt32Value.of(32)).build()), + CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default", + "ROOTCA"), + buildTestFilter("envoy.http_connection_manager")); + List listeners = ImmutableList.of( + Any.pack(buildListener("bar.googleapis.com", + Any.pack(HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration("route-bar.googleapis.com", + ImmutableList.of( + buildVirtualHost( + ImmutableList.of("bar.googleapis.com"), + "cluster-bar.googleapis.com")))) + .build()))), + Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0", + filterChainInbound, + filterChainOutbound + ))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0", + XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + + verify(listenerWatcher, never()).onError(any(Status.class)); + verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class)); + } + + /** Client receives LDS response containing non-matching port in the filterMatch. */ + @Test + public void ldsResponse_nonMatchingPort() { + xdsClient.watchListenerData(PORT, listenerWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an LDS request with null in lds resource name + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); + + final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(8000), null); + final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch( + PORT + 1, // add 1 to mismatch + CidrRange.newBuilder().setAddressPrefix(LOCAL_IP) + .setPrefixLen(UInt32Value.of(32)).build()), + CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default", + "ROOTCA"), + buildTestFilter("envoy.http_connection_manager")); + List listeners = ImmutableList.of( + Any.pack(buildListener("bar.googleapis.com", + Any.pack(HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration("route-bar.googleapis.com", + ImmutableList.of( + buildVirtualHost( + ImmutableList.of("bar.googleapis.com"), + "cluster-bar.googleapis.com")))) + .build()))), + Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0", + filterChainInbound, + filterChainOutbound + ))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0", + XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + + verify(listenerWatcher, never()).onError(any(Status.class)); + verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class)); + } + + /** + * RPC stream close and retry while there is listener watcher registered. + */ + @Test + public void streamClosedAndRetry() { + InOrder inOrder = + Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, + backoffPolicy2); + xdsClient.watchListenerData(PORT, listenerWatcher); + + ArgumentCaptor> responseObserverCaptor = + ArgumentCaptor.forClass(null); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + StreamObserver responseObserver = + responseObserverCaptor.getValue(); // same as responseObservers.poll() + StreamObserver requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null); + final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT, + CidrRange.newBuilder().setAddressPrefix(LOCAL_IP) + .setPrefixLen(UInt32Value.of(32)).build()), + CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default", + "ROOTCA"), + buildTestFilter("envoy.http_connection_manager")); + List listeners = ImmutableList.of( + Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0", + filterChainOutbound, + filterChainInbound + ))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(null); + + // Management server closes the RPC stream with an error. + responseObserver.onError(Status.UNKNOWN.asException()); + verify(listenerWatcher).onError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); + + // Resets backoff and retry immediately. + inOrder.verify(backoffPolicyProvider).get(); + fakeClock.runDueTasks(); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + // Retry resumes requests for all wanted resources. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management server becomes unreachable. + responseObserver.onError(Status.UNAVAILABLE.asException()); + verify(listenerWatcher, times(2)).onError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(9L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management server is still not reachable. + responseObserver.onError(Status.UNAVAILABLE.asException()); + verify(listenerWatcher, times(3)).onError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(99L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management server sends back a LDS response. + response = buildDiscoveryResponse("1", listeners, + XdsClientImpl.ADS_TYPE_URL_LDS, "0001"); + responseObserver.onNext(response); + + // Client sent an LDS ACK request (Omitted). + + // Management server closes the RPC stream. + responseObserver.onCompleted(); + verify(listenerWatcher, times(4)).onError(any(Status.class)); + + // Resets backoff and retry immediately + inOrder.verify(backoffPolicyProvider).get(); + fakeClock.runDueTasks(); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management server becomes unreachable again. + responseObserver.onError(Status.UNAVAILABLE.asException()); + verify(listenerWatcher, times(5)).onError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + inOrder.verify(backoffPolicy2).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(19L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, + backoffPolicy2); + } + + static Listener buildListenerWithFilterChain(String name, int portValue, String address, + FilterChain... filterChains) { + Address listenerAddress = Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setPortValue(portValue).setAddress(address)) + .build(); + return + Listener.newBuilder() + .setName(name) + .setAddress(listenerAddress) + .addAllFilterChains(Arrays.asList(filterChains)) + .build(); + } + + @SuppressWarnings("deprecation") + static FilterChain buildFilterChain(FilterChainMatch filterChainMatch, + DownstreamTlsContext tlsContext, Filter...filters) { + return + FilterChain.newBuilder() + .setFilterChainMatch(filterChainMatch) + .setTlsContext(tlsContext == null + ? DownstreamTlsContext.getDefaultInstance() : tlsContext) + .addAllFilters(Arrays.asList(filters)) + .build(); + } + + static FilterChainMatch buildFilterChainMatch(int destPort, CidrRange...prefixRanges) { + return + FilterChainMatch.newBuilder() + .setDestinationPort(UInt32Value.of(destPort)) + .addAllPrefixRanges(Arrays.asList(prefixRanges)) + .build(); + } + + static Filter buildTestFilter(String name) { + return + Filter.newBuilder() + .setName(name) + .setTypedConfig( + Any.newBuilder() + .setTypeUrl(TYPE_URL_HCM)) + .build(); + } +} diff --git a/xds/src/test/java/io/grpc/xds/internal/sds/CommonTlsContextTestsUtil.java b/xds/src/test/java/io/grpc/xds/internal/sds/CommonTlsContextTestsUtil.java index fec1e8daf3..a7dc265298 100644 --- a/xds/src/test/java/io/grpc/xds/internal/sds/CommonTlsContextTestsUtil.java +++ b/xds/src/test/java/io/grpc/xds/internal/sds/CommonTlsContextTestsUtil.java @@ -128,15 +128,20 @@ public class CommonTlsContextTestsUtil { /** Helper method for creating DownstreamTlsContext values for tests. */ public static DownstreamTlsContext buildTestDownstreamTlsContext() { + return buildTestDownstreamTlsContext("google-sds-config-default", "ROOTCA"); + } + + /** Helper method for creating DownstreamTlsContext values with names. */ + public static DownstreamTlsContext buildTestDownstreamTlsContext( + String certName, String validationContextName) { return buildDownstreamTlsContext( buildCommonTlsContextWithAdditionalValues( - "google-sds-config-default", + certName, "unix:/var/run/sds/uds_path", - "ROOTCA", + validationContextName, "unix:/var/run/sds/uds_path", Arrays.asList("spiffe://grpc-sds-testing.svc.id.goog/ns/default/sa/bob"), Arrays.asList("managed-tls"), - null - )); + null)); } }