mirror of https://github.com/grpc/grpc-java.git
xds: add support for server side Listener watcher in XdsClient (#6801)
This commit is contained in:
parent
111e348def
commit
46eac47efc
|
@ -159,6 +159,7 @@ final class EnvoyServerProtoData {
|
|||
* Corresponds to Envoy proto message {@link io.envoyproxy.envoy.api.v2.listener.FilterChain}.
|
||||
*/
|
||||
static final class FilterChain {
|
||||
// TODO(sanjaypujare): flatten structure by moving FilterChainMatch class members here.
|
||||
private final FilterChainMatch filterChainMatch;
|
||||
// TODO(sanjaypujare): remove dependency on envoy data type along with rest of the code.
|
||||
private final io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext downstreamTlsContext;
|
||||
|
|
|
@ -62,22 +62,15 @@ abstract class XdsClient {
|
|||
*/
|
||||
static final class ConfigUpdate {
|
||||
private final String clusterName;
|
||||
private final Listener listener;
|
||||
|
||||
private ConfigUpdate(String clusterName, @Nullable Listener listener) {
|
||||
private ConfigUpdate(String clusterName) {
|
||||
this.clusterName = clusterName;
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
String getClusterName() {
|
||||
return clusterName;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public Listener getListener() {
|
||||
return listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return
|
||||
|
@ -93,7 +86,6 @@ abstract class XdsClient {
|
|||
|
||||
static final class Builder {
|
||||
private String clusterName;
|
||||
@Nullable private Listener listener;
|
||||
|
||||
// Use ConfigUpdate.newBuilder().
|
||||
private Builder() {
|
||||
|
@ -104,14 +96,9 @@ abstract class XdsClient {
|
|||
return this;
|
||||
}
|
||||
|
||||
Builder setListener(Listener listener) {
|
||||
this.listener = listener;
|
||||
return this;
|
||||
}
|
||||
|
||||
ConfigUpdate build() {
|
||||
Preconditions.checkState(clusterName != null, "clusterName is not set");
|
||||
return new ConfigUpdate(clusterName, listener);
|
||||
return new ConfigUpdate(clusterName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -352,6 +339,52 @@ abstract class XdsClient {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates via resource discovery RPCs using LDS. Includes {@link Listener} object containing
|
||||
* config for security, RBAC or other server side features such as rate limit.
|
||||
*/
|
||||
static final class ListenerUpdate {
|
||||
// TODO(sanjaypujare): flatten structure by moving Listener class members here.
|
||||
private final Listener listener;
|
||||
|
||||
private ListenerUpdate(Listener listener) {
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
public Listener getListener() {
|
||||
return listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("listener", listener)
|
||||
.toString();
|
||||
}
|
||||
|
||||
static Builder newBuilder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
static final class Builder {
|
||||
private Listener listener;
|
||||
|
||||
// Use ListenerUpdate.newBuilder().
|
||||
private Builder() {
|
||||
}
|
||||
|
||||
Builder setListener(Listener listener) {
|
||||
this.listener = listener;
|
||||
return this;
|
||||
}
|
||||
|
||||
ListenerUpdate build() {
|
||||
Preconditions.checkState(listener != null, "listener is not set");
|
||||
return new ListenerUpdate(listener);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Config watcher interface. To be implemented by the xDS resolver.
|
||||
*/
|
||||
|
@ -385,6 +418,19 @@ abstract class XdsClient {
|
|||
void onError(Status error);
|
||||
}
|
||||
|
||||
/**
|
||||
* Listener watcher interface. To be used by {@link io.grpc.xds.internal.sds.XdsServerBuilder}.
|
||||
*/
|
||||
interface ListenerWatcher {
|
||||
|
||||
/**
|
||||
* Called when receiving an update on Listener configuration.
|
||||
*/
|
||||
void onListenerChanged(ListenerUpdate update);
|
||||
|
||||
void onError(Status error);
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown this {@link XdsClient} and release resources.
|
||||
*/
|
||||
|
@ -430,6 +476,12 @@ abstract class XdsClient {
|
|||
void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a watcher for a Listener with the given port.
|
||||
*/
|
||||
void watchListenerData(int port, ListenerWatcher watcher) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Report client load stats to a remote server for the given cluster:cluster_service.
|
||||
*
|
||||
|
|
|
@ -26,6 +26,8 @@ import com.google.common.base.Supplier;
|
|||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import com.google.protobuf.MessageOrBuilder;
|
||||
import com.google.protobuf.Struct;
|
||||
import com.google.protobuf.Value;
|
||||
import com.google.protobuf.util.JsonFormat;
|
||||
import com.google.rpc.Code;
|
||||
import io.envoyproxy.envoy.api.v2.Cluster;
|
||||
|
@ -37,7 +39,10 @@ import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
|
|||
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
|
||||
import io.envoyproxy.envoy.api.v2.Listener;
|
||||
import io.envoyproxy.envoy.api.v2.RouteConfiguration;
|
||||
import io.envoyproxy.envoy.api.v2.core.Address;
|
||||
import io.envoyproxy.envoy.api.v2.core.Node;
|
||||
import io.envoyproxy.envoy.api.v2.listener.FilterChain;
|
||||
import io.envoyproxy.envoy.api.v2.listener.FilterChainMatch;
|
||||
import io.envoyproxy.envoy.api.v2.route.Route;
|
||||
import io.envoyproxy.envoy.api.v2.route.VirtualHost;
|
||||
import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager;
|
||||
|
@ -100,7 +105,7 @@ final class XdsClientImpl extends XdsClient {
|
|||
// The node identifier to be included in xDS requests. Management server only requires the
|
||||
// first request to carry the node identifier on a stream. It should be identical if present
|
||||
// more than once.
|
||||
private final Node node;
|
||||
private Node node;
|
||||
|
||||
// Cached data for CDS responses, keyed by cluster names.
|
||||
// Optimization: cache ClusterUpdate, which contains only information needed by gRPC, instead
|
||||
|
@ -154,13 +159,18 @@ final class XdsClientImpl extends XdsClient {
|
|||
private LoadReportClient lrsClient;
|
||||
|
||||
// Following fields are set only after the ConfigWatcher registered. Once set, they should
|
||||
// never change.
|
||||
// never change. Only a ConfigWatcher or ListenerWatcher can be registered.
|
||||
@Nullable
|
||||
private ConfigWatcher configWatcher;
|
||||
// The "xds:" URI (including port suffix if present) that the gRPC client targets for.
|
||||
@Nullable
|
||||
private String ldsResourceName;
|
||||
|
||||
// only a ConfigWatcher or ListenerWatcher can be registered.
|
||||
@Nullable
|
||||
private ListenerWatcher listenerWatcher;
|
||||
private int listenerPort = -1;
|
||||
|
||||
XdsClientImpl(
|
||||
String targetName,
|
||||
List<ServerInfo> servers, // list of management servers
|
||||
|
@ -232,6 +242,7 @@ final class XdsClientImpl extends XdsClient {
|
|||
@Override
|
||||
void watchConfigData(String targetAuthority, ConfigWatcher watcher) {
|
||||
checkState(configWatcher == null, "watcher for %s already registered", targetAuthority);
|
||||
checkState(listenerWatcher == null, "ListenerWatcher already registered");
|
||||
ldsResourceName = checkNotNull(targetAuthority, "targetAuthority");
|
||||
configWatcher = checkNotNull(watcher, "watcher");
|
||||
logger.log(XdsLogLevel.INFO, "Started watching config {0}", ldsResourceName);
|
||||
|
@ -406,6 +417,41 @@ final class XdsClientImpl extends XdsClient {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void watchListenerData(int port, ListenerWatcher watcher) {
|
||||
checkState(configWatcher == null,
|
||||
"ListenerWatcher cannot be set when ConfigWatcher set");
|
||||
checkState(listenerWatcher == null, "ListenerWatcher already registered");
|
||||
listenerWatcher = checkNotNull(watcher, "watcher");
|
||||
checkArgument(port > 0, "port needs to be > 0");
|
||||
this.listenerPort = port;
|
||||
logger.log(XdsLogLevel.INFO, "Started watching listener for port {0}", port);
|
||||
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
|
||||
// Currently in retry backoff.
|
||||
return;
|
||||
}
|
||||
if (adsStream == null) {
|
||||
startRpcStream();
|
||||
}
|
||||
updateNodeMetadataForListenerRequest(port);
|
||||
adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.<String>of());
|
||||
ldsRespTimer =
|
||||
syncContext
|
||||
.schedule(
|
||||
new ListenerResourceFetchTimeoutTask(":" + port),
|
||||
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
|
||||
}
|
||||
|
||||
/** In case of Listener watcher metadata to be updated to include port. */
|
||||
private void updateNodeMetadataForListenerRequest(int port) {
|
||||
// TODO(sanjaypujare): fields of metadata to update to be finalized
|
||||
Struct newMetadata = node.getMetadata().toBuilder()
|
||||
.putFields("listener_inbound_port",
|
||||
Value.newBuilder().setStringValue("" + port).build())
|
||||
.build();
|
||||
node = node.toBuilder().setMetadata(newMetadata).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
void reportClientStats(
|
||||
String clusterName, @Nullable String clusterServiceName, LoadStatsStore loadStatsStore) {
|
||||
|
@ -463,6 +509,24 @@ final class XdsClientImpl extends XdsClient {
|
|||
adsStreamRetryStopwatch.reset().start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls handleLdsResponseForListener or handleLdsResponseForConfigUpdate based on which watcher
|
||||
* was set.
|
||||
*/
|
||||
private void handleLdsResponse(DiscoveryResponse ldsResponse) {
|
||||
checkState((configWatcher != null) != (listenerWatcher != null),
|
||||
"No LDS request was ever sent. Management server is doing something wrong");
|
||||
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
|
||||
logger.log(
|
||||
XdsLogLevel.DEBUG, "Received LDS response:\n{0}", respPrinter.print(ldsResponse));
|
||||
}
|
||||
if (listenerWatcher != null) {
|
||||
handleLdsResponseForListener(ldsResponse);
|
||||
} else {
|
||||
handleLdsResponseForConfigUpdate(ldsResponse);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles LDS response to find the HttpConnectionManager message for the requested resource name.
|
||||
* Proceed with the resolved RouteConfiguration in HttpConnectionManager message of the requested
|
||||
|
@ -471,13 +535,9 @@ final class XdsClientImpl extends XdsClient {
|
|||
* resolution. The response is NACKed if contains invalid data for gRPC's usage. Otherwise, an
|
||||
* ACK request is sent to management server.
|
||||
*/
|
||||
private void handleLdsResponse(DiscoveryResponse ldsResponse) {
|
||||
private void handleLdsResponseForConfigUpdate(DiscoveryResponse ldsResponse) {
|
||||
checkState(ldsResourceName != null && configWatcher != null,
|
||||
"No LDS request was ever sent. Management server is doing something wrong");
|
||||
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
|
||||
logger.log(
|
||||
XdsLogLevel.DEBUG, "Received LDS response:\n{0}", respPrinter.print(ldsResponse));
|
||||
}
|
||||
"LDS request for ConfigWatcher was never sent!");
|
||||
|
||||
// Unpack Listener messages.
|
||||
List<Listener> listeners = new ArrayList<>(ldsResponse.getResourcesCount());
|
||||
|
@ -605,6 +665,72 @@ final class XdsClientImpl extends XdsClient {
|
|||
}
|
||||
}
|
||||
|
||||
private void handleLdsResponseForListener(DiscoveryResponse ldsResponse) {
|
||||
checkState(ldsResourceName == null && listenerPort > 0 && listenerWatcher != null,
|
||||
"LDS request for ListenerWatcher was never sent!");
|
||||
|
||||
// Unpack Listener messages.
|
||||
Listener requestedListener = null;
|
||||
logger.log(XdsLogLevel.DEBUG, "Listener count: {0}", ldsResponse.getResourcesCount());
|
||||
try {
|
||||
for (com.google.protobuf.Any res : ldsResponse.getResourcesList()) {
|
||||
Listener listener = res.unpack(Listener.class);
|
||||
logger.log(XdsLogLevel.DEBUG, "Found listener {0}", listener.toString());
|
||||
if (isRequestedListener(listener)) {
|
||||
requestedListener = listener;
|
||||
logger.log(XdsLogLevel.DEBUG, "Requested listener found: {0}", listener.getName());
|
||||
}
|
||||
}
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
logger.log(XdsLogLevel.WARNING, "Failed to unpack Listeners in LDS response {0}", e);
|
||||
adsStream.sendNackRequest(
|
||||
ADS_TYPE_URL_LDS, ImmutableList.<String>of(),
|
||||
ldsResponse.getVersionInfo(), "Malformed LDS response: " + e);
|
||||
return;
|
||||
}
|
||||
adsStream.sendAckRequest(ADS_TYPE_URL_LDS, ImmutableList.<String>of(),
|
||||
ldsResponse.getVersionInfo());
|
||||
if (requestedListener != null) {
|
||||
if (ldsRespTimer != null) {
|
||||
ldsRespTimer.cancel();
|
||||
ldsRespTimer = null;
|
||||
}
|
||||
ListenerUpdate listenerUpdate = ListenerUpdate.newBuilder()
|
||||
.setListener(EnvoyServerProtoData.Listener.fromEnvoyProtoListener(requestedListener))
|
||||
.build();
|
||||
listenerWatcher.onListenerChanged(listenerUpdate);
|
||||
} else {
|
||||
if (ldsRespTimer == null) {
|
||||
listenerWatcher.onError(Status.NOT_FOUND.withDescription("did not find listener for "
|
||||
+ listenerPort));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isRequestedListener(Listener listener) {
|
||||
// TODO(sanjaypujare): check listener.getName() once we know what xDS server returns
|
||||
return isAddressMatching(listener.getAddress())
|
||||
&& hasMatchingFilter(listener.getFilterChainsList());
|
||||
}
|
||||
|
||||
private boolean isAddressMatching(Address address) {
|
||||
// TODO(sanjaypujare): check IP address once we know xDS server will include it
|
||||
return address.hasSocketAddress()
|
||||
&& (address.getSocketAddress().getPortValue() == listenerPort);
|
||||
}
|
||||
|
||||
private boolean hasMatchingFilter(List<FilterChain> filterChainsList) {
|
||||
// TODO(sanjaypujare): if myIp to be checked against filterChainMatch.getPrefixRangesList()
|
||||
for (FilterChain filterChain : filterChainsList) {
|
||||
FilterChainMatch filterChainMatch = filterChain.getFilterChainMatch();
|
||||
|
||||
if (listenerPort == filterChainMatch.getDestinationPort().getValue()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles RDS response to find the RouteConfiguration message for the requested resource name.
|
||||
* Proceed with the resolved RouteConfiguration if exists to find the VirtualHost configuration
|
||||
|
@ -1028,6 +1154,14 @@ final class XdsClientImpl extends XdsClient {
|
|||
new LdsResourceFetchTimeoutTask(ldsResourceName),
|
||||
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
|
||||
}
|
||||
if (listenerWatcher != null) {
|
||||
adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.<String>of());
|
||||
ldsRespTimer =
|
||||
syncContext
|
||||
.schedule(
|
||||
new ListenerResourceFetchTimeoutTask(":" + listenerPort),
|
||||
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
|
||||
}
|
||||
if (!clusterWatchers.isEmpty()) {
|
||||
adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet());
|
||||
for (String clusterName : clusterWatchers.keySet()) {
|
||||
|
@ -1164,6 +1298,9 @@ final class XdsClientImpl extends XdsClient {
|
|||
if (configWatcher != null) {
|
||||
configWatcher.onError(error);
|
||||
}
|
||||
if (listenerWatcher != null) {
|
||||
listenerWatcher.onError(error);
|
||||
}
|
||||
for (Set<ClusterWatcher> watchers : clusterWatchers.values()) {
|
||||
for (ClusterWatcher watcher : watchers) {
|
||||
watcher.onError(error);
|
||||
|
@ -1371,6 +1508,23 @@ final class XdsClientImpl extends XdsClient {
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
final class ListenerResourceFetchTimeoutTask extends ResourceFetchTimeoutTask {
|
||||
|
||||
ListenerResourceFetchTimeoutTask(String resourceName) {
|
||||
super(resourceName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
super.run();
|
||||
ldsRespTimer = null;
|
||||
listenerWatcher.onError(
|
||||
Status.NOT_FOUND
|
||||
.withDescription("Listener resource for port " + resourceName + " not found."));
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
final class RdsResourceFetchTimeoutTask extends ResourceFetchTimeoutTask {
|
||||
|
||||
|
|
|
@ -0,0 +1,814 @@
|
|||
/*
|
||||
* Copyright 2020 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryResponse;
|
||||
import static io.grpc.xds.XdsClientTestHelper.buildListener;
|
||||
import static io.grpc.xds.XdsClientTestHelper.buildRouteConfiguration;
|
||||
import static io.grpc.xds.XdsClientTestHelper.buildVirtualHost;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.AdditionalAnswers.delegatesTo;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.protobuf.Any;
|
||||
import com.google.protobuf.Struct;
|
||||
import com.google.protobuf.UInt32Value;
|
||||
import com.google.protobuf.Value;
|
||||
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
|
||||
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
|
||||
import io.envoyproxy.envoy.api.v2.Listener;
|
||||
import io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext;
|
||||
import io.envoyproxy.envoy.api.v2.core.Address;
|
||||
import io.envoyproxy.envoy.api.v2.core.CidrRange;
|
||||
import io.envoyproxy.envoy.api.v2.core.Node;
|
||||
import io.envoyproxy.envoy.api.v2.core.SocketAddress;
|
||||
import io.envoyproxy.envoy.api.v2.listener.Filter;
|
||||
import io.envoyproxy.envoy.api.v2.listener.FilterChain;
|
||||
import io.envoyproxy.envoy.api.v2.listener.FilterChainMatch;
|
||||
import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager;
|
||||
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
|
||||
import io.grpc.Context;
|
||||
import io.grpc.Context.CancellationListener;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.Status.Code;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.inprocess.InProcessChannelBuilder;
|
||||
import io.grpc.inprocess.InProcessServerBuilder;
|
||||
import io.grpc.internal.BackoffPolicy;
|
||||
import io.grpc.internal.FakeClock;
|
||||
import io.grpc.internal.FakeClock.TaskFilter;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import io.grpc.testing.GrpcCleanupRule;
|
||||
import io.grpc.xds.Bootstrapper.ChannelCreds;
|
||||
import io.grpc.xds.Bootstrapper.ServerInfo;
|
||||
import io.grpc.xds.XdsClient.ConfigWatcher;
|
||||
import io.grpc.xds.XdsClient.ListenerUpdate;
|
||||
import io.grpc.xds.XdsClient.ListenerWatcher;
|
||||
import io.grpc.xds.XdsClient.XdsChannelFactory;
|
||||
import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.InOrder;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
/**
|
||||
* Tests for {@link XdsClientImpl for server side Listeners}.
|
||||
*/
|
||||
@RunWith(JUnit4.class)
|
||||
public class XdsClientImplTestForListener {
|
||||
|
||||
private static final int PORT = 7000;
|
||||
private static final String LOCAL_IP = "192.168.3.5";
|
||||
private static final String DIFFERENT_IP = "192.168.3.6";
|
||||
private static final String TYPE_URL_HCM =
|
||||
"type.googleapis.com/"
|
||||
+ "envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager";
|
||||
|
||||
private static final Node NODE = Node.getDefaultInstance();
|
||||
private static final FakeClock.TaskFilter RPC_RETRY_TASK_FILTER =
|
||||
new FakeClock.TaskFilter() {
|
||||
@Override
|
||||
public boolean shouldAccept(Runnable command) {
|
||||
return command.toString().contains(XdsClientImpl.RpcRetryTask.class.getSimpleName());
|
||||
}
|
||||
};
|
||||
private static final TaskFilter LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER =
|
||||
new TaskFilter() {
|
||||
@Override
|
||||
public boolean shouldAccept(Runnable command) {
|
||||
return command.toString()
|
||||
.contains(XdsClientImpl.ListenerResourceFetchTimeoutTask.class.getSimpleName());
|
||||
}
|
||||
};
|
||||
private static final String LISTENER_NAME = "INBOUND_LISTENER";
|
||||
|
||||
@Rule
|
||||
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
|
||||
|
||||
private final SynchronizationContext syncContext = new SynchronizationContext(
|
||||
new Thread.UncaughtExceptionHandler() {
|
||||
@Override
|
||||
public void uncaughtException(Thread t, Throwable e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
private final FakeClock fakeClock = new FakeClock();
|
||||
|
||||
private final Queue<StreamObserver<DiscoveryResponse>> responseObservers = new ArrayDeque<>();
|
||||
private final Queue<StreamObserver<DiscoveryRequest>> requestObservers = new ArrayDeque<>();
|
||||
private final AtomicBoolean callEnded = new AtomicBoolean(true);
|
||||
|
||||
@Mock
|
||||
private AggregatedDiscoveryServiceImplBase mockedDiscoveryService;
|
||||
@Mock
|
||||
private BackoffPolicy.Provider backoffPolicyProvider;
|
||||
@Mock
|
||||
private BackoffPolicy backoffPolicy1;
|
||||
@Mock
|
||||
private BackoffPolicy backoffPolicy2;
|
||||
@Mock
|
||||
private ConfigWatcher configWatcher;
|
||||
@Mock
|
||||
private ListenerWatcher listenerWatcher;
|
||||
|
||||
private ManagedChannel channel;
|
||||
private XdsClientImpl xdsClient;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
|
||||
when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L);
|
||||
when(backoffPolicy2.nextBackoffNanos()).thenReturn(20L, 200L);
|
||||
|
||||
final String serverName = InProcessServerBuilder.generateName();
|
||||
AggregatedDiscoveryServiceImplBase adsServiceImpl = new AggregatedDiscoveryServiceImplBase() {
|
||||
@Override
|
||||
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
|
||||
final StreamObserver<DiscoveryResponse> responseObserver) {
|
||||
assertThat(callEnded.get()).isTrue(); // ensure previous call was ended
|
||||
callEnded.set(false);
|
||||
Context.current().addListener(
|
||||
new CancellationListener() {
|
||||
@Override
|
||||
public void cancelled(Context context) {
|
||||
callEnded.set(true);
|
||||
}
|
||||
}, MoreExecutors.directExecutor());
|
||||
responseObservers.offer(responseObserver);
|
||||
@SuppressWarnings("unchecked")
|
||||
StreamObserver<DiscoveryRequest> requestObserver = mock(StreamObserver.class);
|
||||
requestObservers.offer(requestObserver);
|
||||
return requestObserver;
|
||||
}
|
||||
};
|
||||
mockedDiscoveryService =
|
||||
mock(AggregatedDiscoveryServiceImplBase.class, delegatesTo(adsServiceImpl));
|
||||
|
||||
cleanupRule.register(
|
||||
InProcessServerBuilder
|
||||
.forName(serverName)
|
||||
.addService(mockedDiscoveryService)
|
||||
.directExecutor()
|
||||
.build()
|
||||
.start());
|
||||
channel =
|
||||
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
|
||||
|
||||
List<ServerInfo> servers =
|
||||
ImmutableList.of(new ServerInfo(serverName, ImmutableList.<ChannelCreds>of()));
|
||||
XdsChannelFactory channelFactory = new XdsChannelFactory() {
|
||||
@Override
|
||||
ManagedChannel createChannel(List<ServerInfo> servers) {
|
||||
assertThat(Iterables.getOnlyElement(servers).getServerUri()).isEqualTo(serverName);
|
||||
assertThat(Iterables.getOnlyElement(servers).getChannelCredentials()).isEmpty();
|
||||
return channel;
|
||||
}
|
||||
};
|
||||
|
||||
xdsClient =
|
||||
new XdsClientImpl("", servers, channelFactory, NODE, syncContext,
|
||||
fakeClock.getScheduledExecutorService(), backoffPolicyProvider,
|
||||
fakeClock.getStopwatchSupplier());
|
||||
// Only the connection to management server is established, no RPC request is sent until at
|
||||
// least one watcher is registered.
|
||||
assertThat(responseObservers).isEmpty();
|
||||
assertThat(requestObservers).isEmpty();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
xdsClient.shutdown();
|
||||
assertThat(callEnded.get()).isTrue();
|
||||
assertThat(channel.isShutdown()).isTrue();
|
||||
assertThat(fakeClock.getPendingTasks()).isEmpty();
|
||||
}
|
||||
|
||||
private Node getNodeToVerify() {
|
||||
Struct newMetadata = NODE.getMetadata().toBuilder()
|
||||
.putFields("listener_inbound_port",
|
||||
Value.newBuilder().setStringValue("" + PORT).build())
|
||||
.build();
|
||||
return NODE.toBuilder().setMetadata(newMetadata).build();
|
||||
}
|
||||
|
||||
private static DiscoveryRequest buildDiscoveryRequest(
|
||||
Node node, String versionInfo, String typeUrl, String nonce) {
|
||||
return DiscoveryRequest.newBuilder()
|
||||
.setVersionInfo(versionInfo)
|
||||
.setNode(node)
|
||||
.setTypeUrl(typeUrl)
|
||||
.setResponseNonce(nonce)
|
||||
.build();
|
||||
}
|
||||
|
||||
/** Error when ConfigWatcher and then ListenerWatcher registered. */
|
||||
@Test
|
||||
public void ldsResponse_configAndListenerWatcher_expectError() {
|
||||
xdsClient.watchConfigData("somehost:80", configWatcher);
|
||||
try {
|
||||
xdsClient.watchListenerData(PORT, listenerWatcher);
|
||||
fail("expected exception");
|
||||
} catch (IllegalStateException expected) {
|
||||
assertThat(expected)
|
||||
.hasMessageThat()
|
||||
.isEqualTo("ListenerWatcher cannot be set when ConfigWatcher set");
|
||||
}
|
||||
}
|
||||
|
||||
/** Error when ListenerWatcher and then ConfigWatcher registered. */
|
||||
@Test
|
||||
public void ldsResponse_listenerAndConfigWatcher_expectError() {
|
||||
xdsClient.watchListenerData(PORT, listenerWatcher);
|
||||
try {
|
||||
xdsClient.watchConfigData("somehost:80", configWatcher);
|
||||
fail("expected exception");
|
||||
} catch (IllegalStateException expected) {
|
||||
assertThat(expected)
|
||||
.hasMessageThat()
|
||||
.isEqualTo("ListenerWatcher already registered");
|
||||
}
|
||||
}
|
||||
|
||||
/** Error when 2 ListenerWatchers registered. */
|
||||
@Test
|
||||
public void ldsResponse_2listenerWatchers_expectError() {
|
||||
xdsClient.watchListenerData(PORT, listenerWatcher);
|
||||
try {
|
||||
xdsClient.watchListenerData(80, listenerWatcher);
|
||||
fail("expected exception");
|
||||
} catch (IllegalStateException expected) {
|
||||
assertThat(expected)
|
||||
.hasMessageThat()
|
||||
.isEqualTo("ListenerWatcher already registered");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Client receives an LDS response that contains listener with no match i.e. no port match.
|
||||
*/
|
||||
@Test
|
||||
public void ldsResponse_nonMatchingFilterChain_notFoundError() {
|
||||
xdsClient.watchListenerData(PORT, listenerWatcher);
|
||||
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
|
||||
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
|
||||
|
||||
// Client sends an LDS request with null in lds resource name
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
|
||||
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
|
||||
|
||||
List<Any> listeners = ImmutableList.of(
|
||||
Any.pack(buildListener("bar.googleapis.com",
|
||||
Any.pack(HttpConnectionManager.newBuilder()
|
||||
.setRouteConfig(
|
||||
buildRouteConfiguration("route-bar.googleapis.com",
|
||||
ImmutableList.of(
|
||||
buildVirtualHost(
|
||||
ImmutableList.of("bar.googleapis.com"),
|
||||
"cluster-bar.googleapis.com"))))
|
||||
.build()))),
|
||||
Any.pack(buildListener(LISTENER_NAME,
|
||||
Any.pack(HttpConnectionManager.newBuilder()
|
||||
.setRouteConfig(
|
||||
buildRouteConfiguration("route-baz.googleapis.com",
|
||||
ImmutableList.of(
|
||||
buildVirtualHost(
|
||||
ImmutableList.of("baz.googleapis.com"),
|
||||
"cluster-baz.googleapis.com"))))
|
||||
.build()))));
|
||||
DiscoveryResponse response =
|
||||
buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
|
||||
responseObserver.onNext(response);
|
||||
|
||||
// Client sends an ACK LDS request.
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
|
||||
|
||||
verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class));
|
||||
verify(listenerWatcher, never()).onError(any(Status.class));
|
||||
fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
|
||||
ArgumentCaptor<Status> errorStatusCaptor = ArgumentCaptor.forClass(null);
|
||||
verify(listenerWatcher).onError(errorStatusCaptor.capture());
|
||||
Status error = errorStatusCaptor.getValue();
|
||||
assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND);
|
||||
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
|
||||
}
|
||||
|
||||
/** Client receives a Listener with listener address and mismatched port. */
|
||||
@Test
|
||||
public void ldsResponseWith_listenerAddressPortMismatch() {
|
||||
xdsClient.watchListenerData(PORT, listenerWatcher);
|
||||
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
|
||||
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
|
||||
|
||||
// Client sends an LDS request with null in lds resource name
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
|
||||
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
|
||||
|
||||
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
|
||||
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
|
||||
CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
|
||||
.setPrefixLen(UInt32Value.of(32)).build()),
|
||||
CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default",
|
||||
"ROOTCA"),
|
||||
buildTestFilter("envoy.http_connection_manager"));
|
||||
List<Any> listeners = ImmutableList.of(
|
||||
Any.pack(buildListener("bar.googleapis.com",
|
||||
Any.pack(HttpConnectionManager.newBuilder()
|
||||
.setRouteConfig(
|
||||
buildRouteConfiguration("route-bar.googleapis.com",
|
||||
ImmutableList.of(
|
||||
buildVirtualHost(
|
||||
ImmutableList.of("bar.googleapis.com"),
|
||||
"cluster-bar.googleapis.com"))))
|
||||
.build()))),
|
||||
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0",
|
||||
filterChainOutbound,
|
||||
filterChainInbound
|
||||
)));
|
||||
DiscoveryResponse response =
|
||||
buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
|
||||
responseObserver.onNext(response);
|
||||
|
||||
// Client sends an ACK LDS request.
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
|
||||
|
||||
verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class));
|
||||
verify(listenerWatcher, never()).onError(any(Status.class));
|
||||
fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
|
||||
ArgumentCaptor<Status> errorStatusCaptor = ArgumentCaptor.forClass(null);
|
||||
verify(listenerWatcher).onError(errorStatusCaptor.capture());
|
||||
Status error = errorStatusCaptor.getValue();
|
||||
assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND);
|
||||
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
|
||||
}
|
||||
|
||||
/** Client receives a Listener with all match. */
|
||||
@Test
|
||||
public void ldsResponseWith_matchingListenerFound() {
|
||||
xdsClient.watchListenerData(PORT, listenerWatcher);
|
||||
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
|
||||
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
|
||||
|
||||
// Client sends an LDS request with null in lds resource name
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
|
||||
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
|
||||
|
||||
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
|
||||
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
|
||||
CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
|
||||
.setPrefixLen(UInt32Value.of(32)).build()),
|
||||
CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default",
|
||||
"ROOTCA"),
|
||||
buildTestFilter("envoy.http_connection_manager"));
|
||||
List<Any> listeners = ImmutableList.of(
|
||||
Any.pack(buildListener("bar.googleapis.com",
|
||||
Any.pack(HttpConnectionManager.newBuilder()
|
||||
.setRouteConfig(
|
||||
buildRouteConfiguration("route-bar.googleapis.com",
|
||||
ImmutableList.of(
|
||||
buildVirtualHost(
|
||||
ImmutableList.of("bar.googleapis.com"),
|
||||
"cluster-bar.googleapis.com"))))
|
||||
.build()))),
|
||||
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0",
|
||||
filterChainOutbound,
|
||||
filterChainInbound
|
||||
)));
|
||||
DiscoveryResponse response =
|
||||
buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
|
||||
responseObserver.onNext(response);
|
||||
|
||||
// Client sends an ACK LDS request.
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
|
||||
|
||||
verify(listenerWatcher, never()).onError(any(Status.class));
|
||||
ArgumentCaptor<ListenerUpdate> listenerUpdateCaptor = ArgumentCaptor.forClass(null);
|
||||
verify(listenerWatcher, times(1)).onListenerChanged(listenerUpdateCaptor.capture());
|
||||
ListenerUpdate configUpdate = listenerUpdateCaptor.getValue();
|
||||
EnvoyServerProtoData.Listener listener = configUpdate.getListener();
|
||||
assertThat(listener.getName()).isEqualTo(LISTENER_NAME);
|
||||
assertThat(listener.getAddress()).isEqualTo("0.0.0.0:" + PORT);
|
||||
EnvoyServerProtoData.FilterChain[] expected = new EnvoyServerProtoData.FilterChain[]{
|
||||
EnvoyServerProtoData.FilterChain.fromEnvoyProtoFilterChain(filterChainOutbound),
|
||||
EnvoyServerProtoData.FilterChain.fromEnvoyProtoFilterChain(filterChainInbound)
|
||||
};
|
||||
assertThat(listener.getFilterChains()).isEqualTo(Arrays.asList(expected));
|
||||
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
|
||||
}
|
||||
|
||||
/** Client receives LDS responses for updating Listener previously received. */
|
||||
@Test
|
||||
public void notifyUpdatedListener() {
|
||||
xdsClient.watchListenerData(PORT, listenerWatcher);
|
||||
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
|
||||
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
|
||||
|
||||
// Client sends an LDS request with null in lds resource name
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
|
||||
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
|
||||
|
||||
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
|
||||
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
|
||||
CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
|
||||
.setPrefixLen(UInt32Value.of(32)).build()),
|
||||
CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default",
|
||||
"ROOTCA"),
|
||||
buildTestFilter("envoy.http_connection_manager"));
|
||||
List<Any> listeners = ImmutableList.of(
|
||||
Any.pack(buildListener("bar.googleapis.com",
|
||||
Any.pack(HttpConnectionManager.newBuilder()
|
||||
.setRouteConfig(
|
||||
buildRouteConfiguration("route-bar.googleapis.com",
|
||||
ImmutableList.of(
|
||||
buildVirtualHost(
|
||||
ImmutableList.of("bar.googleapis.com"),
|
||||
"cluster-bar.googleapis.com"))))
|
||||
.build()))),
|
||||
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0",
|
||||
filterChainOutbound,
|
||||
filterChainInbound
|
||||
)));
|
||||
DiscoveryResponse response =
|
||||
buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
|
||||
responseObserver.onNext(response);
|
||||
|
||||
// Client sends an ACK LDS request.
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
|
||||
|
||||
verify(listenerWatcher, never()).onError(any(Status.class));
|
||||
ArgumentCaptor<ListenerUpdate> listenerUpdateCaptor = ArgumentCaptor.forClass(null);
|
||||
verify(listenerWatcher, times(1)).onListenerChanged(listenerUpdateCaptor.capture());
|
||||
|
||||
// Management sends back another LDS response containing updates for the requested Listener.
|
||||
final FilterChain filterChainNewInbound = buildFilterChain(buildFilterChainMatch(PORT,
|
||||
CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
|
||||
.setPrefixLen(UInt32Value.of(32)).build()),
|
||||
CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default1",
|
||||
"ROOTCA2"),
|
||||
buildTestFilter("envoy.http_connection_manager"));
|
||||
List<Any> listeners1 = ImmutableList.of(
|
||||
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0",
|
||||
filterChainNewInbound
|
||||
)));
|
||||
DiscoveryResponse response1 =
|
||||
buildDiscoveryResponse("1", listeners1, XdsClientImpl.ADS_TYPE_URL_LDS, "0001");
|
||||
responseObserver.onNext(response1);
|
||||
|
||||
// Client sends an ACK LDS request.
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "0001")));
|
||||
|
||||
// Updated listener is notified to config watcher.
|
||||
listenerUpdateCaptor = ArgumentCaptor.forClass(null);
|
||||
verify(listenerWatcher, times(2)).onListenerChanged(listenerUpdateCaptor.capture());
|
||||
ListenerUpdate configUpdate = listenerUpdateCaptor.getValue();
|
||||
EnvoyServerProtoData.Listener listener = configUpdate.getListener();
|
||||
assertThat(listener.getName()).isEqualTo(LISTENER_NAME);
|
||||
EnvoyServerProtoData.FilterChain[] expected = new EnvoyServerProtoData.FilterChain[]{
|
||||
EnvoyServerProtoData.FilterChain.fromEnvoyProtoFilterChain(filterChainNewInbound)
|
||||
};
|
||||
assertThat(listener.getFilterChains()).isEqualTo(Arrays.asList(expected));
|
||||
}
|
||||
|
||||
/**
|
||||
* Client receives LDS response containing matching name but non-matching IP address. Test
|
||||
* disabled until IP matching logic implemented.
|
||||
*/
|
||||
@Ignore
|
||||
@Test
|
||||
public void ldsResponse_nonMatchingIpAddress() {
|
||||
xdsClient.watchListenerData(PORT, listenerWatcher);
|
||||
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
|
||||
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
|
||||
|
||||
// Client sends an LDS request with null in lds resource name
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
|
||||
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
|
||||
|
||||
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(8000), null);
|
||||
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(PORT,
|
||||
CidrRange.newBuilder().setAddressPrefix(DIFFERENT_IP)
|
||||
.setPrefixLen(UInt32Value.of(32)).build()),
|
||||
CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default",
|
||||
"ROOTCA"),
|
||||
buildTestFilter("envoy.http_connection_manager"));
|
||||
List<Any> listeners = ImmutableList.of(
|
||||
Any.pack(buildListener("bar.googleapis.com",
|
||||
Any.pack(HttpConnectionManager.newBuilder()
|
||||
.setRouteConfig(
|
||||
buildRouteConfiguration("route-bar.googleapis.com",
|
||||
ImmutableList.of(
|
||||
buildVirtualHost(
|
||||
ImmutableList.of("bar.googleapis.com"),
|
||||
"cluster-bar.googleapis.com"))))
|
||||
.build()))),
|
||||
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0",
|
||||
filterChainInbound,
|
||||
filterChainOutbound
|
||||
)));
|
||||
DiscoveryResponse response =
|
||||
buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
|
||||
responseObserver.onNext(response);
|
||||
|
||||
// Client sends an ACK LDS request.
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
|
||||
|
||||
verify(listenerWatcher, never()).onError(any(Status.class));
|
||||
verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class));
|
||||
}
|
||||
|
||||
/** Client receives LDS response containing non-matching port in the filterMatch. */
|
||||
@Test
|
||||
public void ldsResponse_nonMatchingPort() {
|
||||
xdsClient.watchListenerData(PORT, listenerWatcher);
|
||||
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
|
||||
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
|
||||
|
||||
// Client sends an LDS request with null in lds resource name
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
|
||||
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
|
||||
|
||||
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(8000), null);
|
||||
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(
|
||||
PORT + 1, // add 1 to mismatch
|
||||
CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
|
||||
.setPrefixLen(UInt32Value.of(32)).build()),
|
||||
CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default",
|
||||
"ROOTCA"),
|
||||
buildTestFilter("envoy.http_connection_manager"));
|
||||
List<Any> listeners = ImmutableList.of(
|
||||
Any.pack(buildListener("bar.googleapis.com",
|
||||
Any.pack(HttpConnectionManager.newBuilder()
|
||||
.setRouteConfig(
|
||||
buildRouteConfiguration("route-bar.googleapis.com",
|
||||
ImmutableList.of(
|
||||
buildVirtualHost(
|
||||
ImmutableList.of("bar.googleapis.com"),
|
||||
"cluster-bar.googleapis.com"))))
|
||||
.build()))),
|
||||
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0",
|
||||
filterChainInbound,
|
||||
filterChainOutbound
|
||||
)));
|
||||
DiscoveryResponse response =
|
||||
buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
|
||||
responseObserver.onNext(response);
|
||||
|
||||
// Client sends an ACK LDS request.
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
|
||||
|
||||
verify(listenerWatcher, never()).onError(any(Status.class));
|
||||
verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* RPC stream close and retry while there is listener watcher registered.
|
||||
*/
|
||||
@Test
|
||||
public void streamClosedAndRetry() {
|
||||
InOrder inOrder =
|
||||
Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
|
||||
backoffPolicy2);
|
||||
xdsClient.watchListenerData(PORT, listenerWatcher);
|
||||
|
||||
ArgumentCaptor<StreamObserver<DiscoveryResponse>> responseObserverCaptor =
|
||||
ArgumentCaptor.forClass(null);
|
||||
inOrder.verify(mockedDiscoveryService)
|
||||
.streamAggregatedResources(responseObserverCaptor.capture());
|
||||
StreamObserver<DiscoveryResponse> responseObserver =
|
||||
responseObserverCaptor.getValue(); // same as responseObservers.poll()
|
||||
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
|
||||
|
||||
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
|
||||
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
|
||||
CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
|
||||
.setPrefixLen(UInt32Value.of(32)).build()),
|
||||
CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default",
|
||||
"ROOTCA"),
|
||||
buildTestFilter("envoy.http_connection_manager"));
|
||||
List<Any> listeners = ImmutableList.of(
|
||||
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0",
|
||||
filterChainOutbound,
|
||||
filterChainInbound
|
||||
)));
|
||||
DiscoveryResponse response =
|
||||
buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
|
||||
responseObserver.onNext(response);
|
||||
|
||||
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(null);
|
||||
|
||||
// Management server closes the RPC stream with an error.
|
||||
responseObserver.onError(Status.UNKNOWN.asException());
|
||||
verify(listenerWatcher).onError(statusCaptor.capture());
|
||||
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN);
|
||||
|
||||
// Resets backoff and retry immediately.
|
||||
inOrder.verify(backoffPolicyProvider).get();
|
||||
fakeClock.runDueTasks();
|
||||
inOrder.verify(mockedDiscoveryService)
|
||||
.streamAggregatedResources(responseObserverCaptor.capture());
|
||||
responseObserver = responseObserverCaptor.getValue();
|
||||
requestObserver = requestObservers.poll();
|
||||
|
||||
// Retry resumes requests for all wanted resources.
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
|
||||
|
||||
// Management server becomes unreachable.
|
||||
responseObserver.onError(Status.UNAVAILABLE.asException());
|
||||
verify(listenerWatcher, times(2)).onError(statusCaptor.capture());
|
||||
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
inOrder.verify(backoffPolicy1).nextBackoffNanos();
|
||||
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
|
||||
|
||||
// Retry after backoff.
|
||||
fakeClock.forwardNanos(9L);
|
||||
assertThat(requestObservers).isEmpty();
|
||||
fakeClock.forwardNanos(1L);
|
||||
inOrder.verify(mockedDiscoveryService)
|
||||
.streamAggregatedResources(responseObserverCaptor.capture());
|
||||
responseObserver = responseObserverCaptor.getValue();
|
||||
requestObserver = requestObservers.poll();
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
|
||||
|
||||
// Management server is still not reachable.
|
||||
responseObserver.onError(Status.UNAVAILABLE.asException());
|
||||
verify(listenerWatcher, times(3)).onError(statusCaptor.capture());
|
||||
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
inOrder.verify(backoffPolicy1).nextBackoffNanos();
|
||||
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
|
||||
|
||||
// Retry after backoff.
|
||||
fakeClock.forwardNanos(99L);
|
||||
assertThat(requestObservers).isEmpty();
|
||||
fakeClock.forwardNanos(1L);
|
||||
inOrder.verify(mockedDiscoveryService)
|
||||
.streamAggregatedResources(responseObserverCaptor.capture());
|
||||
responseObserver = responseObserverCaptor.getValue();
|
||||
requestObserver = requestObservers.poll();
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
|
||||
|
||||
// Management server sends back a LDS response.
|
||||
response = buildDiscoveryResponse("1", listeners,
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "0001");
|
||||
responseObserver.onNext(response);
|
||||
|
||||
// Client sent an LDS ACK request (Omitted).
|
||||
|
||||
// Management server closes the RPC stream.
|
||||
responseObserver.onCompleted();
|
||||
verify(listenerWatcher, times(4)).onError(any(Status.class));
|
||||
|
||||
// Resets backoff and retry immediately
|
||||
inOrder.verify(backoffPolicyProvider).get();
|
||||
fakeClock.runDueTasks();
|
||||
inOrder.verify(mockedDiscoveryService)
|
||||
.streamAggregatedResources(responseObserverCaptor.capture());
|
||||
responseObserver = responseObserverCaptor.getValue();
|
||||
requestObserver = requestObservers.poll();
|
||||
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
|
||||
|
||||
// Management server becomes unreachable again.
|
||||
responseObserver.onError(Status.UNAVAILABLE.asException());
|
||||
verify(listenerWatcher, times(5)).onError(statusCaptor.capture());
|
||||
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
inOrder.verify(backoffPolicy2).nextBackoffNanos();
|
||||
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
|
||||
|
||||
// Retry after backoff.
|
||||
fakeClock.forwardNanos(19L);
|
||||
assertThat(requestObservers).isEmpty();
|
||||
fakeClock.forwardNanos(1L);
|
||||
inOrder.verify(mockedDiscoveryService)
|
||||
.streamAggregatedResources(responseObserverCaptor.capture());
|
||||
requestObserver = requestObservers.poll();
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
|
||||
|
||||
verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
|
||||
backoffPolicy2);
|
||||
}
|
||||
|
||||
static Listener buildListenerWithFilterChain(String name, int portValue, String address,
|
||||
FilterChain... filterChains) {
|
||||
Address listenerAddress = Address.newBuilder()
|
||||
.setSocketAddress(SocketAddress.newBuilder()
|
||||
.setPortValue(portValue).setAddress(address))
|
||||
.build();
|
||||
return
|
||||
Listener.newBuilder()
|
||||
.setName(name)
|
||||
.setAddress(listenerAddress)
|
||||
.addAllFilterChains(Arrays.asList(filterChains))
|
||||
.build();
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
static FilterChain buildFilterChain(FilterChainMatch filterChainMatch,
|
||||
DownstreamTlsContext tlsContext, Filter...filters) {
|
||||
return
|
||||
FilterChain.newBuilder()
|
||||
.setFilterChainMatch(filterChainMatch)
|
||||
.setTlsContext(tlsContext == null
|
||||
? DownstreamTlsContext.getDefaultInstance() : tlsContext)
|
||||
.addAllFilters(Arrays.asList(filters))
|
||||
.build();
|
||||
}
|
||||
|
||||
static FilterChainMatch buildFilterChainMatch(int destPort, CidrRange...prefixRanges) {
|
||||
return
|
||||
FilterChainMatch.newBuilder()
|
||||
.setDestinationPort(UInt32Value.of(destPort))
|
||||
.addAllPrefixRanges(Arrays.asList(prefixRanges))
|
||||
.build();
|
||||
}
|
||||
|
||||
static Filter buildTestFilter(String name) {
|
||||
return
|
||||
Filter.newBuilder()
|
||||
.setName(name)
|
||||
.setTypedConfig(
|
||||
Any.newBuilder()
|
||||
.setTypeUrl(TYPE_URL_HCM))
|
||||
.build();
|
||||
}
|
||||
}
|
|
@ -128,15 +128,20 @@ public class CommonTlsContextTestsUtil {
|
|||
|
||||
/** Helper method for creating DownstreamTlsContext values for tests. */
|
||||
public static DownstreamTlsContext buildTestDownstreamTlsContext() {
|
||||
return buildTestDownstreamTlsContext("google-sds-config-default", "ROOTCA");
|
||||
}
|
||||
|
||||
/** Helper method for creating DownstreamTlsContext values with names. */
|
||||
public static DownstreamTlsContext buildTestDownstreamTlsContext(
|
||||
String certName, String validationContextName) {
|
||||
return buildDownstreamTlsContext(
|
||||
buildCommonTlsContextWithAdditionalValues(
|
||||
"google-sds-config-default",
|
||||
certName,
|
||||
"unix:/var/run/sds/uds_path",
|
||||
"ROOTCA",
|
||||
validationContextName,
|
||||
"unix:/var/run/sds/uds_path",
|
||||
Arrays.asList("spiffe://grpc-sds-testing.svc.id.goog/ns/default/sa/bob"),
|
||||
Arrays.asList("managed-tls"),
|
||||
null
|
||||
));
|
||||
null));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue