xds: remove ResourceType enum, use XdsResourceType instead (#9587)

Now the xds resources are dynamically managed in resourceStore in xdsClient. The types is a xdsResourceType, singleton.
There is no longer hardcoded static list of known resource types, the subscription list is the source of truth.
AbstractXdsClient that manages AdsStream will only accept the xds resource types that has already has watchers subscribed to, same behaviour as before.
This commit is contained in:
yifeizhuang 2022-10-06 13:10:55 -07:00 committed by GitHub
parent cff8bd8d87
commit 68339250e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 206 additions and 263 deletions

View File

@ -48,7 +48,6 @@ import io.grpc.stub.StreamObserver;
import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.Node; import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.ResourceStore; import io.grpc.xds.XdsClient.ResourceStore;
import io.grpc.xds.XdsClient.ResourceUpdate;
import io.grpc.xds.XdsClient.XdsResponseHandler; import io.grpc.xds.XdsClient.XdsResponseHandler;
import io.grpc.xds.XdsClientImpl.XdsChannelFactory; import io.grpc.xds.XdsClientImpl.XdsChannelFactory;
import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsLogger.XdsLogLevel;
@ -82,7 +81,7 @@ final class AbstractXdsClient {
// Last successfully applied version_info for each resource type. Starts with empty string. // Last successfully applied version_info for each resource type. Starts with empty string.
// A version_info is used to update management server with client's most recent knowledge of // A version_info is used to update management server with client's most recent knowledge of
// resources. // resources.
private final Map<ResourceType, String> versions = new HashMap<>(); private final Map<XdsResourceType<?>, String> versions = new HashMap<>();
private boolean shutdown; private boolean shutdown;
@Nullable @Nullable
@ -160,8 +159,7 @@ final class AbstractXdsClient {
if (adsStream == null) { if (adsStream == null) {
startRpcStream(); startRpcStream();
} }
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
resourceType.typeName());
if (resources != null) { if (resources != null) {
adsStream.sendDiscoveryRequest(resourceType, resources); adsStream.sendDiscoveryRequest(resourceType, resources);
} }
@ -172,11 +170,10 @@ final class AbstractXdsClient {
* and sends an ACK request to the management server. * and sends an ACK request to the management server.
*/ */
// Must be synchronized. // Must be synchronized.
void ackResponse(XdsResourceType<?> xdsResourceType, String versionInfo, String nonce) { void ackResponse(XdsResourceType<?> type, String versionInfo, String nonce) {
ResourceType type = xdsResourceType.typeName();
versions.put(type, versionInfo); versions.put(type, versionInfo);
logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}", logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
type, nonce, versionInfo); type.typeName(), nonce, versionInfo);
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type); Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
if (resources == null) { if (resources == null) {
resources = Collections.emptyList(); resources = Collections.emptyList();
@ -189,11 +186,10 @@ final class AbstractXdsClient {
* accepted version) to the management server. * accepted version) to the management server.
*/ */
// Must be synchronized. // Must be synchronized.
void nackResponse(XdsResourceType<?> xdsResourceType, String nonce, String errorDetail) { void nackResponse(XdsResourceType<?> type, String nonce, String errorDetail) {
ResourceType type = xdsResourceType.typeName();
String versionInfo = versions.getOrDefault(type, ""); String versionInfo = versions.getOrDefault(type, "");
logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}", logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
type, nonce, versionInfo); type.typeName(), nonce, versionInfo);
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type); Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
if (resources == null) { if (resources == null) {
resources = Collections.emptyList(); resources = Collections.emptyList();
@ -239,77 +235,38 @@ final class AbstractXdsClient {
return; return;
} }
startRpcStream(); startRpcStream();
for (ResourceType type : ResourceType.values()) { for (XdsResourceType<?> type : resourceStore.getXdsResourceTypes()) {
if (type == ResourceType.UNKNOWN) {
continue;
}
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type); Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
if (resources != null) { if (resources != null) {
adsStream.sendDiscoveryRequest(resourceStore.getXdsResourceType(type), resources); adsStream.sendDiscoveryRequest(type, resources);
} }
} }
xdsResponseHandler.handleStreamRestarted(serverInfo); xdsResponseHandler.handleStreamRestarted(serverInfo);
} }
} }
// TODO(zivy) : remove and replace with XdsResourceType @VisibleForTesting
enum ResourceType { @Nullable
UNKNOWN, LDS, RDS, CDS, EDS; static XdsResourceType<?> fromTypeUrl(String typeUrl) {
switch (typeUrl) {
String typeUrl() { case ADS_TYPE_URL_LDS:
switch (this) { // fall trough
case LDS: case ADS_TYPE_URL_LDS_V2:
return ADS_TYPE_URL_LDS; return XdsListenerResource.getInstance();
case RDS: case ADS_TYPE_URL_RDS:
return ADS_TYPE_URL_RDS; // fall through
case CDS: case ADS_TYPE_URL_RDS_V2:
return ADS_TYPE_URL_CDS; return XdsRouteConfigureResource.getInstance();
case EDS: case ADS_TYPE_URL_CDS:
return ADS_TYPE_URL_EDS; // fall through
case UNKNOWN: case ADS_TYPE_URL_CDS_V2:
default: return XdsClusterResource.getInstance();
throw new AssertionError("Unknown or missing case in enum switch: " + this); case ADS_TYPE_URL_EDS:
} // fall through
} case ADS_TYPE_URL_EDS_V2:
return XdsEndpointResource.getInstance();
String typeUrlV2() { default:
switch (this) { return null;
case LDS:
return ADS_TYPE_URL_LDS_V2;
case RDS:
return ADS_TYPE_URL_RDS_V2;
case CDS:
return ADS_TYPE_URL_CDS_V2;
case EDS:
return ADS_TYPE_URL_EDS_V2;
case UNKNOWN:
default:
throw new AssertionError("Unknown or missing case in enum switch: " + this);
}
}
@VisibleForTesting
static ResourceType fromTypeUrl(String typeUrl) {
switch (typeUrl) {
case ADS_TYPE_URL_LDS:
// fall trough
case ADS_TYPE_URL_LDS_V2:
return LDS;
case ADS_TYPE_URL_RDS:
// fall through
case ADS_TYPE_URL_RDS_V2:
return RDS;
case ADS_TYPE_URL_CDS:
// fall through
case ADS_TYPE_URL_CDS_V2:
return CDS;
case ADS_TYPE_URL_EDS:
// fall through
case ADS_TYPE_URL_EDS_V2:
return EDS;
default:
return UNKNOWN;
}
} }
} }
@ -322,7 +279,7 @@ final class AbstractXdsClient {
// used for management server to identify which response the client is ACKing/NACking. // used for management server to identify which response the client is ACKing/NACking.
// To avoid confusion, client-initiated requests will always use the nonce in // To avoid confusion, client-initiated requests will always use the nonce in
// most recently received responses of each resource type. // most recently received responses of each resource type.
private final Map<ResourceType, String> respNonces = new HashMap<>(); private final Map<XdsResourceType<?>, String> respNonces = new HashMap<>();
abstract void start(); abstract void start();
@ -334,27 +291,27 @@ final class AbstractXdsClient {
* client-initiated discovery requests, use {@link * client-initiated discovery requests, use {@link
* #sendDiscoveryRequest(XdsResourceType, Collection)}. * #sendDiscoveryRequest(XdsResourceType, Collection)}.
*/ */
abstract void sendDiscoveryRequest(ResourceType type, String version, abstract void sendDiscoveryRequest(XdsResourceType<?> type, String version,
Collection<String> resources, String nonce, @Nullable String errorDetail); Collection<String> resources, String nonce, @Nullable String errorDetail);
/** /**
* Sends a client-initiated discovery request. * Sends a client-initiated discovery request.
*/ */
final void sendDiscoveryRequest(XdsResourceType<? extends ResourceUpdate> xdsResourceType, final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> resources) {
Collection<String> resources) {
ResourceType type = xdsResourceType.typeName();
logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources); logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources);
sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources, sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources,
respNonces.getOrDefault(type, ""), null); respNonces.getOrDefault(type, ""), null);
} }
final void handleRpcResponse( final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources,
ResourceType type, String versionInfo, List<Any> resources, String nonce) { String nonce) {
if (closed) { if (closed) {
return; return;
} }
responseReceived = true; responseReceived = true;
respNonces.put(type, nonce); if (type != null) {
respNonces.put(type, nonce);
}
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce); xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce);
} }
@ -422,7 +379,7 @@ final class AbstractXdsClient {
syncContext.execute(new Runnable() { syncContext.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
ResourceType type = ResourceType.fromTypeUrl(response.getTypeUrl()); XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
if (logger.isLoggable(XdsLogLevel.DEBUG)) { if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log( logger.log(
XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type, XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
@ -458,8 +415,9 @@ final class AbstractXdsClient {
} }
@Override @Override
void sendDiscoveryRequest(ResourceType type, String versionInfo, Collection<String> resources, void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
String nonce, @Nullable String errorDetail) { Collection<String> resources, String nonce,
@Nullable String errorDetail) {
checkState(requestWriter != null, "ADS stream has not been started"); checkState(requestWriter != null, "ADS stream has not been started");
io.envoyproxy.envoy.api.v2.DiscoveryRequest.Builder builder = io.envoyproxy.envoy.api.v2.DiscoveryRequest.Builder builder =
io.envoyproxy.envoy.api.v2.DiscoveryRequest.newBuilder() io.envoyproxy.envoy.api.v2.DiscoveryRequest.newBuilder()
@ -502,7 +460,7 @@ final class AbstractXdsClient {
syncContext.execute(new Runnable() { syncContext.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
ResourceType type = ResourceType.fromTypeUrl(response.getTypeUrl()); XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
if (logger.isLoggable(XdsLogLevel.DEBUG)) { if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log( logger.log(
XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type, XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
@ -538,8 +496,9 @@ final class AbstractXdsClient {
} }
@Override @Override
void sendDiscoveryRequest(ResourceType type, String versionInfo, Collection<String> resources, void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
String nonce, @Nullable String errorDetail) { Collection<String> resources, String nonce,
@Nullable String errorDetail) {
checkState(requestWriter != null, "ADS stream has not been started"); checkState(requestWriter != null, "ADS stream has not been started");
DiscoveryRequest.Builder builder = DiscoveryRequest.Builder builder =
DiscoveryRequest.newBuilder() DiscoveryRequest.newBuilder()

View File

@ -33,7 +33,6 @@ import io.grpc.Status;
import io.grpc.StatusException; import io.grpc.StatusException;
import io.grpc.internal.ObjectPool; import io.grpc.internal.ObjectPool;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import io.grpc.xds.XdsClient.ResourceMetadata; import io.grpc.xds.XdsClient.ResourceMetadata;
import io.grpc.xds.XdsClient.ResourceMetadata.ResourceMetadataStatus; import io.grpc.xds.XdsClient.ResourceMetadata.ResourceMetadataStatus;
import io.grpc.xds.XdsClient.ResourceMetadata.UpdateFailureState; import io.grpc.xds.XdsClient.ResourceMetadata.UpdateFailureState;
@ -156,12 +155,12 @@ public final class CsdsService extends
ClientConfig.Builder builder = ClientConfig.newBuilder() ClientConfig.Builder builder = ClientConfig.newBuilder()
.setNode(xdsClient.getBootstrapInfo().node().toEnvoyProtoNode()); .setNode(xdsClient.getBootstrapInfo().node().toEnvoyProtoNode());
Map<ResourceType, Map<String, ResourceMetadata>> metadataByType = Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =
awaitSubscribedResourcesMetadata(xdsClient.getSubscribedResourcesMetadataSnapshot()); awaitSubscribedResourcesMetadata(xdsClient.getSubscribedResourcesMetadataSnapshot());
for (Map.Entry<ResourceType, Map<String, ResourceMetadata>> metadataByTypeEntry for (Map.Entry<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByTypeEntry
: metadataByType.entrySet()) { : metadataByType.entrySet()) {
ResourceType type = metadataByTypeEntry.getKey(); XdsResourceType<?> type = metadataByTypeEntry.getKey();
Map<String, ResourceMetadata> metadataByResourceName = metadataByTypeEntry.getValue(); Map<String, ResourceMetadata> metadataByResourceName = metadataByTypeEntry.getValue();
for (Map.Entry<String, ResourceMetadata> metadataEntry : metadataByResourceName.entrySet()) { for (Map.Entry<String, ResourceMetadata> metadataEntry : metadataByResourceName.entrySet()) {
String resourceName = metadataEntry.getKey(); String resourceName = metadataEntry.getKey();
@ -187,8 +186,9 @@ public final class CsdsService extends
return builder.build(); return builder.build();
} }
private static Map<ResourceType, Map<String, ResourceMetadata>> awaitSubscribedResourcesMetadata( private static Map<XdsResourceType<?>, Map<String, ResourceMetadata>>
ListenableFuture<Map<ResourceType, Map<String, ResourceMetadata>>> future) awaitSubscribedResourcesMetadata(
ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> future)
throws InterruptedException { throws InterruptedException {
try { try {
// Normally this shouldn't take long, but add some slack for cases like a cold JVM. // Normally this shouldn't take long, but add some slack for cases like a cold JVM.

View File

@ -25,7 +25,6 @@ import com.google.common.net.UrlEscapers;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Any; import com.google.protobuf.Any;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats; import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
@ -296,7 +295,7 @@ abstract class XdsClient {
* a map ("resource name": "resource metadata"). * a map ("resource name": "resource metadata").
*/ */
// Must be synchronized. // Must be synchronized.
ListenableFuture<Map<ResourceType, Map<String, ResourceMetadata>>> ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
getSubscribedResourcesMetadataSnapshot() { getSubscribedResourcesMetadataSnapshot() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@ -347,8 +346,8 @@ abstract class XdsClient {
interface XdsResponseHandler { interface XdsResponseHandler {
/** Called when a xds response is received. */ /** Called when a xds response is received. */
void handleResourceResponse( void handleResourceResponse(
ResourceType resourceType, ServerInfo serverInfo, String versionInfo, List<Any> resources, XdsResourceType<?> resourceType, ServerInfo serverInfo, String versionInfo,
String nonce); List<Any> resources, String nonce);
/** Called when the ADS stream is closed passively. */ /** Called when the ADS stream is closed passively. */
// Must be synchronized. // Must be synchronized.
@ -369,9 +368,9 @@ abstract class XdsClient {
*/ */
// Must be synchronized. // Must be synchronized.
@Nullable @Nullable
Collection<String> getSubscribedResources(ServerInfo serverInfo, ResourceType type); Collection<String> getSubscribedResources(ServerInfo serverInfo,
XdsResourceType<? extends ResourceUpdate> type);
@Nullable Collection<XdsResourceType<? extends ResourceUpdate>> getXdsResourceTypes();
XdsResourceType<? extends ResourceUpdate> getXdsResourceType(ResourceType type);
} }
} }

View File

@ -18,10 +18,6 @@ package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.AbstractXdsClient.ResourceType.CDS;
import static io.grpc.xds.AbstractXdsClient.ResourceType.EDS;
import static io.grpc.xds.AbstractXdsClient.ResourceType.LDS;
import static io.grpc.xds.AbstractXdsClient.ResourceType.RDS;
import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME; import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME;
import static io.grpc.xds.XdsResourceType.ParsedResource; import static io.grpc.xds.XdsResourceType.ParsedResource;
import static io.grpc.xds.XdsResourceType.ValidatedResourceUpdate; import static io.grpc.xds.XdsResourceType.ValidatedResourceUpdate;
@ -46,7 +42,6 @@ import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy; import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.TimeProvider; import io.grpc.internal.TimeProvider;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import io.grpc.xds.Bootstrapper.AuthorityInfo; import io.grpc.xds.Bootstrapper.AuthorityInfo;
import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
@ -96,12 +91,6 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
private final Map<XdsResourceType<? extends ResourceUpdate>, private final Map<XdsResourceType<? extends ResourceUpdate>,
Map<String, ResourceSubscriber<? extends ResourceUpdate>>> Map<String, ResourceSubscriber<? extends ResourceUpdate>>>
resourceSubscribers = new HashMap<>(); resourceSubscribers = new HashMap<>();
private final Map<ResourceType, XdsResourceType<? extends ResourceUpdate>> xdsResourceTypeMap =
ImmutableMap.of(
LDS, XdsListenerResource.getInstance(),
RDS, XdsRouteConfigureResource.getInstance(),
CDS, XdsClusterResource.getInstance(),
EDS, XdsEndpointResource.getInstance());
private final LoadStatsManager2 loadStatsManager; private final LoadStatsManager2 loadStatsManager;
private final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>(); private final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>();
private final XdsChannelFactory xdsChannelFactory; private final XdsChannelFactory xdsChannelFactory;
@ -166,17 +155,16 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
@Override @Override
public void handleResourceResponse( public void handleResourceResponse(
ResourceType resourceType, ServerInfo serverInfo, String versionInfo, List<Any> resources, XdsResourceType<?> xdsResourceType, ServerInfo serverInfo, String versionInfo,
String nonce) { List<Any> resources, String nonce) {
syncContext.throwIfNotInThisSynchronizationContext(); syncContext.throwIfNotInThisSynchronizationContext();
XdsResourceType<? extends ResourceUpdate> xdsResourceType =
xdsResourceTypeMap.get(resourceType);
if (xdsResourceType == null) { if (xdsResourceType == null) {
logger.log(XdsLogLevel.WARNING, "Ignore an unknown type of DiscoveryResponse"); logger.log(XdsLogLevel.WARNING, "Ignore an unknown type of DiscoveryResponse");
return; return;
} }
Set<String> toParseResourceNames = null; Set<String> toParseResourceNames = null;
if (!(resourceType == LDS || resourceType == RDS) if (!(xdsResourceType == XdsListenerResource.getInstance()
|| xdsResourceType == XdsRouteConfigureResource.getInstance())
&& resourceSubscribers.containsKey(xdsResourceType)) { && resourceSubscribers.containsKey(xdsResourceType)) {
toParseResourceNames = resourceSubscribers.get(xdsResourceType).keySet(); toParseResourceNames = resourceSubscribers.get(xdsResourceType).keySet();
} }
@ -239,23 +227,17 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
return isShutdown; return isShutdown;
} }
private Map<String, ResourceSubscriber<? extends ResourceUpdate>> getSubscribedResourcesMap(
ResourceType type) {
return resourceSubscribers.getOrDefault(xdsResourceTypeMap.get(type), Collections.emptyMap());
}
@Nullable
@Override @Override
public XdsResourceType<? extends ResourceUpdate> getXdsResourceType(ResourceType type) { public Collection<XdsResourceType<? extends ResourceUpdate>> getXdsResourceTypes() {
return xdsResourceTypeMap.get(type); return resourceSubscribers.keySet();
} }
@Nullable @Nullable
@Override @Override
public Collection<String> getSubscribedResources(ServerInfo serverInfo, public Collection<String> getSubscribedResources(ServerInfo serverInfo,
ResourceType type) { XdsResourceType<? extends ResourceUpdate> type) {
Map<String, ResourceSubscriber<? extends ResourceUpdate>> resources = Map<String, ResourceSubscriber<? extends ResourceUpdate>> resources =
getSubscribedResourcesMap(type); resourceSubscribers.getOrDefault(type, Collections.emptyMap());
ImmutableSet.Builder<String> builder = ImmutableSet.builder(); ImmutableSet.Builder<String> builder = ImmutableSet.builder();
for (String key : resources.keySet()) { for (String key : resources.keySet()) {
if (resources.get(key).serverInfo.equals(serverInfo)) { if (resources.get(key).serverInfo.equals(serverInfo)) {
@ -266,26 +248,26 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
return retVal.isEmpty() ? null : retVal; return retVal.isEmpty() ? null : retVal;
} }
// As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic.
// ResourceTypes that do not have subscribers does not show up in the snapshot keys.
@Override @Override
ListenableFuture<Map<ResourceType, Map<String, ResourceMetadata>>> ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
getSubscribedResourcesMetadataSnapshot() { getSubscribedResourcesMetadataSnapshot() {
final SettableFuture<Map<ResourceType, Map<String, ResourceMetadata>>> future = final SettableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> future =
SettableFuture.create(); SettableFuture.create();
syncContext.execute(new Runnable() { syncContext.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
// A map from a "resource type" to a map ("resource name": "resource metadata") // A map from a "resource type" to a map ("resource name": "resource metadata")
ImmutableMap.Builder<ResourceType, Map<String, ResourceMetadata>> metadataSnapshot = ImmutableMap.Builder<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataSnapshot =
ImmutableMap.builder(); ImmutableMap.builder();
for (XdsResourceType<? extends ResourceUpdate> resourceType: xdsResourceTypeMap.values()) { for (XdsResourceType<?> resourceType: resourceSubscribers.keySet()) {
ImmutableMap.Builder<String, ResourceMetadata> metadataMap = ImmutableMap.builder(); ImmutableMap.Builder<String, ResourceMetadata> metadataMap = ImmutableMap.builder();
Map<String, ResourceSubscriber<? extends ResourceUpdate>> resourceSubscriberMap =
resourceSubscribers.getOrDefault(resourceType, Collections.emptyMap());
for (Map.Entry<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry for (Map.Entry<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry
: resourceSubscriberMap.entrySet()) { : resourceSubscribers.get(resourceType).entrySet()) {
metadataMap.put(resourceEntry.getKey(), resourceEntry.getValue().metadata); metadataMap.put(resourceEntry.getKey(), resourceEntry.getValue().metadata);
} }
metadataSnapshot.put(resourceType.typeName(), metadataMap.buildOrThrow()); metadataSnapshot.put(resourceType, metadataMap.buildOrThrow());
} }
future.set(metadataSnapshot.buildOrThrow()); future.set(metadataSnapshot.buildOrThrow());
} }
@ -312,7 +294,7 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);; (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
if (subscriber == null) { if (subscriber == null) {
logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName); logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName);
subscriber = new ResourceSubscriber<>(type.typeName(), resourceName); subscriber = new ResourceSubscriber<>(type, resourceName);
resourceSubscribers.get(type).put(resourceName, subscriber); resourceSubscribers.get(type).put(resourceName, subscriber);
if (subscriber.xdsChannel != null) { if (subscriber.xdsChannel != null) {
subscriber.xdsChannel.adjustResourceSubscription(type); subscriber.xdsChannel.adjustResourceSubscription(type);
@ -427,8 +409,9 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
} }
long updateTime = timeProvider.currentTimeNanos(); long updateTime = timeProvider.currentTimeNanos();
for (Map.Entry<String, ResourceSubscriber<?>> entry : Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscribedResources =
getSubscribedResourcesMap(xdsResourceType.typeName()).entrySet()) { resourceSubscribers.getOrDefault(xdsResourceType, Collections.emptyMap());
for (Map.Entry<String, ResourceSubscriber<?>> entry : subscribedResources.entrySet()) {
String resourceName = entry.getKey(); String resourceName = entry.getKey();
ResourceSubscriber<T> subscriber = (ResourceSubscriber<T>) entry.getValue(); ResourceSubscriber<T> subscriber = (ResourceSubscriber<T>) entry.getValue();
@ -473,7 +456,7 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
// LDS/CDS responses represents the state of the world, RDS/EDS resources not referenced in // LDS/CDS responses represents the state of the world, RDS/EDS resources not referenced in
// LDS/CDS resources should be deleted. // LDS/CDS resources should be deleted.
if (xdsResourceType.dependentResource() != null) { if (xdsResourceType.dependentResource() != null) {
XdsResourceType<?> dependency = xdsResourceTypeMap.get(xdsResourceType.dependentResource()); XdsResourceType<?> dependency = xdsResourceType.dependentResource();
Map<String, ResourceSubscriber<? extends ResourceUpdate>> dependentSubscribers = Map<String, ResourceSubscriber<? extends ResourceUpdate>> dependentSubscribers =
resourceSubscribers.get(dependency); resourceSubscribers.get(dependency);
if (dependentSubscribers == null) { if (dependentSubscribers == null) {
@ -493,13 +476,13 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
return; return;
} }
String resourceName = null; String resourceName = null;
if (subscriber.type == LDS) { if (subscriber.type == XdsListenerResource.getInstance()) {
LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data; LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data;
io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager(); io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
if (hcm != null) { if (hcm != null) {
resourceName = hcm.rdsName(); resourceName = hcm.rdsName();
} }
} else if (subscriber.type == CDS) { } else if (subscriber.type == XdsClusterResource.getInstance()) {
CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data; CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data;
resourceName = cdsUpdate.edsServiceName(); resourceName = cdsUpdate.edsServiceName();
} }
@ -515,7 +498,7 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
private final class ResourceSubscriber<T extends ResourceUpdate> { private final class ResourceSubscriber<T extends ResourceUpdate> {
@Nullable private final ServerInfo serverInfo; @Nullable private final ServerInfo serverInfo;
@Nullable private final AbstractXdsClient xdsChannel; @Nullable private final AbstractXdsClient xdsChannel;
private final ResourceType type; private final XdsResourceType<T> type;
private final String resource; private final String resource;
private final Set<ResourceWatcher<T>> watchers = new HashSet<>(); private final Set<ResourceWatcher<T>> watchers = new HashSet<>();
@Nullable private T data; @Nullable private T data;
@ -527,7 +510,7 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
@Nullable private ResourceMetadata metadata; @Nullable private ResourceMetadata metadata;
@Nullable private String errorDescription; @Nullable private String errorDescription;
ResourceSubscriber(ResourceType type, String resource) { ResourceSubscriber(XdsResourceType<T> type, String resource) {
syncContext.throwIfNotInThisSynchronizationContext(); syncContext.throwIfNotInThisSynchronizationContext();
this.type = type; this.type = type;
this.resource = resource; this.resource = resource;
@ -669,7 +652,8 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
// and the resource is reusable. // and the resource is reusable.
boolean ignoreResourceDeletionEnabled = boolean ignoreResourceDeletionEnabled =
serverInfo != null && serverInfo.ignoreResourceDeletion(); serverInfo != null && serverInfo.ignoreResourceDeletion();
boolean isStateOfTheWorld = (type == LDS || type == CDS); boolean isStateOfTheWorld = (type == XdsListenerResource.getInstance()
|| type == XdsClusterResource.getInstance());
if (ignoreResourceDeletionEnabled && isStateOfTheWorld && data != null) { if (ignoreResourceDeletionEnabled && isStateOfTheWorld && data != null) {
if (!resourceDeletionIgnored) { if (!resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_WARNING, logger.log(XdsLogLevel.FORCE_WARNING,

View File

@ -17,9 +17,6 @@
package io.grpc.xds; package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.AbstractXdsClient.ResourceType;
import static io.grpc.xds.AbstractXdsClient.ResourceType.CDS;
import static io.grpc.xds.AbstractXdsClient.ResourceType.EDS;
import static io.grpc.xds.Bootstrapper.ServerInfo; import static io.grpc.xds.Bootstrapper.ServerInfo;
import com.google.auto.value.AutoValue; import com.google.auto.value.AutoValue;
@ -77,8 +74,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
} }
@Override @Override
ResourceType typeName() { String typeName() {
return CDS; return "CDS";
} }
@Override @Override
@ -93,8 +90,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
@Nullable @Nullable
@Override @Override
ResourceType dependentResource() { XdsResourceType<?> dependentResource() {
return EDS; return XdsEndpointResource.getInstance();
} }
@Override @Override

View File

@ -17,8 +17,6 @@
package io.grpc.xds; package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.AbstractXdsClient.ResourceType;
import static io.grpc.xds.AbstractXdsClient.ResourceType.EDS;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects;
@ -66,8 +64,8 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
} }
@Override @Override
ResourceType typeName() { String typeName() {
return EDS; return "EDS";
} }
@Override @Override
@ -82,7 +80,7 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
@Nullable @Nullable
@Override @Override
ResourceType dependentResource() { XdsResourceType<?> dependentResource() {
return null; return null;
} }

View File

@ -17,9 +17,6 @@
package io.grpc.xds; package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.AbstractXdsClient.ResourceType;
import static io.grpc.xds.AbstractXdsClient.ResourceType.LDS;
import static io.grpc.xds.AbstractXdsClient.ResourceType.RDS;
import static io.grpc.xds.XdsClient.ResourceUpdate; import static io.grpc.xds.XdsClient.ResourceUpdate;
import static io.grpc.xds.XdsClientImpl.ResourceInvalidException; import static io.grpc.xds.XdsClientImpl.ResourceInvalidException;
import static io.grpc.xds.XdsClusterResource.validateCommonTlsContext; import static io.grpc.xds.XdsClusterResource.validateCommonTlsContext;
@ -81,8 +78,8 @@ class XdsListenerResource extends XdsResourceType<LdsUpdate> {
} }
@Override @Override
ResourceType typeName() { String typeName() {
return LDS; return "LDS";
} }
@Override @Override
@ -102,8 +99,8 @@ class XdsListenerResource extends XdsResourceType<LdsUpdate> {
@Nullable @Nullable
@Override @Override
ResourceType dependentResource() { XdsResourceType<?> dependentResource() {
return RDS; return XdsRouteConfigureResource.getInstance();
} }
@Override @Override

View File

@ -47,7 +47,6 @@ import io.grpc.Status.Code;
import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool; import io.grpc.internal.ObjectPool;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import io.grpc.xds.Bootstrapper.AuthorityInfo; import io.grpc.xds.Bootstrapper.AuthorityInfo;
import io.grpc.xds.Bootstrapper.BootstrapInfo; import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.ClusterSpecifierPlugin.PluginConfig; import io.grpc.xds.ClusterSpecifierPlugin.PluginConfig;
@ -202,8 +201,9 @@ final class XdsNameResolver extends NameResolver {
replacement = XdsClient.percentEncodePath(replacement); replacement = XdsClient.percentEncodePath(replacement);
} }
String ldsResourceName = expandPercentS(listenerNameTemplate, replacement); String ldsResourceName = expandPercentS(listenerNameTemplate, replacement);
if (!XdsClient.isResourceNameValid(ldsResourceName, ResourceType.LDS.typeUrl()) if (!XdsClient.isResourceNameValid(ldsResourceName, XdsListenerResource.getInstance().typeUrl())
&& !XdsClient.isResourceNameValid(ldsResourceName, ResourceType.LDS.typeUrlV2())) { && !XdsClient.isResourceNameValid(ldsResourceName,
XdsListenerResource.getInstance().typeUrlV2())) {
listener.onError(Status.INVALID_ARGUMENT.withDescription( listener.onError(Status.INVALID_ARGUMENT.withDescription(
"invalid listener resource URI for service authority: " + serviceAuthority)); "invalid listener resource URI for service authority: " + serviceAuthority));
return; return;

View File

@ -17,7 +17,6 @@
package io.grpc.xds; package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.AbstractXdsClient.ResourceType;
import static io.grpc.xds.Bootstrapper.ServerInfo; import static io.grpc.xds.Bootstrapper.ServerInfo;
import static io.grpc.xds.XdsClient.ResourceUpdate; import static io.grpc.xds.XdsClient.ResourceUpdate;
import static io.grpc.xds.XdsClient.canonifyResourceName; import static io.grpc.xds.XdsClient.canonifyResourceName;
@ -80,7 +79,7 @@ abstract class XdsResourceType<T extends ResourceUpdate> {
abstract Class<? extends com.google.protobuf.Message> unpackedClassName(); abstract Class<? extends com.google.protobuf.Message> unpackedClassName();
abstract ResourceType typeName(); abstract String typeName();
abstract String typeUrl(); abstract String typeUrl();
@ -88,7 +87,7 @@ abstract class XdsResourceType<T extends ResourceUpdate> {
// Non-null for State of the World resources. // Non-null for State of the World resources.
@Nullable @Nullable
abstract ResourceType dependentResource(); abstract XdsResourceType<?> dependentResource();
static class Args { static class Args {
final ServerInfo serverInfo; final ServerInfo serverInfo;

View File

@ -17,8 +17,6 @@
package io.grpc.xds; package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.AbstractXdsClient.ResourceType;
import static io.grpc.xds.AbstractXdsClient.ResourceType.RDS;
import static io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import static io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
import com.github.udpa.udpa.type.v1.TypedStruct; import com.github.udpa.udpa.type.v1.TypedStruct;
@ -95,8 +93,8 @@ class XdsRouteConfigureResource extends XdsResourceType<RdsUpdate> {
} }
@Override @Override
ResourceType typeName() { String typeName() {
return RDS; return "RDS";
} }
@Override @Override
@ -111,7 +109,7 @@ class XdsRouteConfigureResource extends XdsResourceType<RdsUpdate> {
@Nullable @Nullable
@Override @Override
ResourceType dependentResource() { XdsResourceType<?> dependentResource() {
return null; return null;
} }

View File

@ -17,7 +17,6 @@
package io.grpc.xds; package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static io.grpc.xds.AbstractXdsClient.ResourceType.CDS;
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
@ -657,7 +656,7 @@ public class CdsLoadBalancer2Test {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName, <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher) { ResourceWatcher<T> watcher) {
assertThat(type.typeName()).isEqualTo(CDS); assertThat(type.typeName()).isEqualTo("CDS");
assertThat(watchers).doesNotContainKey(resourceName); assertThat(watchers).doesNotContainKey(resourceName);
watchers.put(resourceName, (ResourceWatcher<CdsUpdate>)watcher); watchers.put(resourceName, (ResourceWatcher<CdsUpdate>)watcher);
} }
@ -667,7 +666,7 @@ public class CdsLoadBalancer2Test {
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type, <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName, String resourceName,
ResourceWatcher<T> watcher) { ResourceWatcher<T> watcher) {
assertThat(type.typeName()).isEqualTo(CDS); assertThat(type.typeName()).isEqualTo("CDS");
assertThat(watchers).containsKey(resourceName); assertThat(watchers).containsKey(resourceName);
watchers.remove(resourceName); watchers.remove(resourceName);
} }

View File

@ -17,7 +17,6 @@
package io.grpc.xds; package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static io.grpc.xds.AbstractXdsClient.ResourceType.EDS;
import static io.grpc.xds.XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;
@ -1176,7 +1175,7 @@ public class ClusterResolverLoadBalancerTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName, <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher) { ResourceWatcher<T> watcher) {
assertThat(type.typeName()).isEqualTo(EDS); assertThat(type.typeName()).isEqualTo("EDS");
assertThat(watchers).doesNotContainKey(resourceName); assertThat(watchers).doesNotContainKey(resourceName);
watchers.put(resourceName, (ResourceWatcher<EdsUpdate>) watcher); watchers.put(resourceName, (ResourceWatcher<EdsUpdate>) watcher);
} }
@ -1186,7 +1185,7 @@ public class ClusterResolverLoadBalancerTest {
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type, <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName, String resourceName,
ResourceWatcher<T> watcher) { ResourceWatcher<T> watcher) {
assertThat(type.typeName()).isEqualTo(EDS); assertThat(type.typeName()).isEqualTo("EDS");
assertThat(watchers).containsKey(resourceName); assertThat(watchers).containsKey(resourceName);
watchers.remove(resourceName); watchers.remove(resourceName);
} }

View File

@ -17,10 +17,6 @@
package io.grpc.xds; package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static io.grpc.xds.AbstractXdsClient.ResourceType.CDS;
import static io.grpc.xds.AbstractXdsClient.ResourceType.EDS;
import static io.grpc.xds.AbstractXdsClient.ResourceType.LDS;
import static io.grpc.xds.AbstractXdsClient.ResourceType.RDS;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
@ -49,13 +45,12 @@ import io.grpc.internal.ObjectPool;
import io.grpc.internal.testing.StreamRecorder; import io.grpc.internal.testing.StreamRecorder;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcServerRule; import io.grpc.testing.GrpcServerRule;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import io.grpc.xds.Bootstrapper.BootstrapInfo; import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.XdsClient.ResourceMetadata; import io.grpc.xds.XdsClient.ResourceMetadata;
import io.grpc.xds.XdsClient.ResourceMetadata.ResourceMetadataStatus; import io.grpc.xds.XdsClient.ResourceMetadata.ResourceMetadataStatus;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory; import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
import java.util.EnumMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -81,6 +76,10 @@ public class CsdsServiceTest {
.node(BOOTSTRAP_NODE) .node(BOOTSTRAP_NODE)
.build(); .build();
private static final XdsClient XDS_CLIENT_NO_RESOURCES = new FakeXdsClient(); private static final XdsClient XDS_CLIENT_NO_RESOURCES = new FakeXdsClient();
private static final XdsResourceType<?> LDS = XdsListenerResource.getInstance();
private static final XdsResourceType<?> CDS = XdsClusterResource.getInstance();
private static final XdsResourceType<?> RDS = XdsRouteConfigureResource.getInstance();
private static final XdsResourceType<?> EDS = XdsEndpointResource.getInstance();
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public static class ServiceTests { public static class ServiceTests {
@ -126,7 +125,7 @@ public class CsdsServiceTest {
public void fetchClientConfig_unexpectedException() { public void fetchClientConfig_unexpectedException() {
XdsClient throwingXdsClient = new FakeXdsClient() { XdsClient throwingXdsClient = new FakeXdsClient() {
@Override @Override
ListenableFuture<Map<ResourceType, Map<String, ResourceMetadata>>> ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
getSubscribedResourcesMetadataSnapshot() { getSubscribedResourcesMetadataSnapshot() {
return Futures.immediateFailedFuture( return Futures.immediateFailedFuture(
new IllegalArgumentException("IllegalArgumentException")); new IllegalArgumentException("IllegalArgumentException"));
@ -150,12 +149,12 @@ public class CsdsServiceTest {
public void fetchClientConfig_interruptedException() { public void fetchClientConfig_interruptedException() {
XdsClient throwingXdsClient = new FakeXdsClient() { XdsClient throwingXdsClient = new FakeXdsClient() {
@Override @Override
ListenableFuture<Map<ResourceType, Map<String, ResourceMetadata>>> ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
getSubscribedResourcesMetadataSnapshot() { getSubscribedResourcesMetadataSnapshot() {
return Futures.submit( return Futures.submit(
new Callable<Map<ResourceType, Map<String, ResourceMetadata>>>() { new Callable<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>() {
@Override @Override
public Map<ResourceType, Map<String, ResourceMetadata>> call() { public Map<XdsResourceType<?>, Map<String, ResourceMetadata>> call() {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
return null; return null;
} }
@ -323,9 +322,9 @@ public class CsdsServiceTest {
throws InterruptedException { throws InterruptedException {
ClientConfig clientConfig = CsdsService.getClientConfigForXdsClient(new FakeXdsClient() { ClientConfig clientConfig = CsdsService.getClientConfigForXdsClient(new FakeXdsClient() {
@Override @Override
protected Map<ResourceType, Map<String, ResourceMetadata>> protected Map<XdsResourceType<?>, Map<String, ResourceMetadata>>
getSubscribedResourcesMetadata() { getSubscribedResourcesMetadata() {
return new ImmutableMap.Builder<ResourceType, Map<String, ResourceMetadata>>() return new ImmutableMap.Builder<XdsResourceType<?>, Map<String, ResourceMetadata>>()
.put(LDS, ImmutableMap.of("subscribedResourceName.LDS", METADATA_ACKED_LDS)) .put(LDS, ImmutableMap.of("subscribedResourceName.LDS", METADATA_ACKED_LDS))
.put(RDS, ImmutableMap.of("subscribedResourceName.RDS", METADATA_ACKED_RDS)) .put(RDS, ImmutableMap.of("subscribedResourceName.RDS", METADATA_ACKED_RDS))
.put(CDS, ImmutableMap.of("subscribedResourceName.CDS", METADATA_ACKED_CDS)) .put(CDS, ImmutableMap.of("subscribedResourceName.CDS", METADATA_ACKED_CDS))
@ -340,7 +339,7 @@ public class CsdsServiceTest {
// is propagated to the correct resource types. // is propagated to the correct resource types.
int xdsConfigCount = clientConfig.getGenericXdsConfigsCount(); int xdsConfigCount = clientConfig.getGenericXdsConfigsCount();
assertThat(xdsConfigCount).isEqualTo(4); assertThat(xdsConfigCount).isEqualTo(4);
EnumMap<ResourceType, GenericXdsConfig> configDumps = mapConfigDumps(clientConfig); Map<XdsResourceType<?>, GenericXdsConfig> configDumps = mapConfigDumps(clientConfig);
assertThat(configDumps.keySet()).containsExactly(LDS, RDS, CDS, EDS); assertThat(configDumps.keySet()).containsExactly(LDS, RDS, CDS, EDS);
// LDS. // LDS.
@ -384,7 +383,7 @@ public class CsdsServiceTest {
private static void verifyClientConfigNoResources(ClientConfig clientConfig) { private static void verifyClientConfigNoResources(ClientConfig clientConfig) {
int xdsConfigCount = clientConfig.getGenericXdsConfigsCount(); int xdsConfigCount = clientConfig.getGenericXdsConfigsCount();
assertThat(xdsConfigCount).isEqualTo(0); assertThat(xdsConfigCount).isEqualTo(0);
EnumMap<ResourceType, GenericXdsConfig> configDumps = mapConfigDumps(clientConfig); Map<XdsResourceType<?>, GenericXdsConfig> configDumps = mapConfigDumps(clientConfig);
assertThat(configDumps).isEmpty(); assertThat(configDumps).isEmpty();
} }
@ -398,12 +397,12 @@ public class CsdsServiceTest {
assertThat(node).isEqualTo(BOOTSTRAP_NODE.toEnvoyProtoNode()); assertThat(node).isEqualTo(BOOTSTRAP_NODE.toEnvoyProtoNode());
} }
private static EnumMap<ResourceType, GenericXdsConfig> mapConfigDumps(ClientConfig config) { private static Map<XdsResourceType<?>, GenericXdsConfig> mapConfigDumps(ClientConfig config) {
EnumMap<ResourceType, GenericXdsConfig> xdsConfigMap = new EnumMap<>(ResourceType.class); Map<XdsResourceType<?>, GenericXdsConfig> xdsConfigMap = new HashMap<>();
List<GenericXdsConfig> xdsConfigList = config.getGenericXdsConfigsList(); List<GenericXdsConfig> xdsConfigList = config.getGenericXdsConfigsList();
for (GenericXdsConfig genericXdsConfig : xdsConfigList) { for (GenericXdsConfig genericXdsConfig : xdsConfigList) {
ResourceType type = ResourceType.fromTypeUrl(genericXdsConfig.getTypeUrl()); XdsResourceType<?> type = AbstractXdsClient.fromTypeUrl(genericXdsConfig.getTypeUrl());
assertThat(type).isNotEqualTo(ResourceType.UNKNOWN); assertThat(type).isNotNull();
assertThat(xdsConfigMap).doesNotContainKey(type); assertThat(xdsConfigMap).doesNotContainKey(type);
xdsConfigMap.put(type, genericXdsConfig); xdsConfigMap.put(type, genericXdsConfig);
} }
@ -411,12 +410,13 @@ public class CsdsServiceTest {
} }
private static class FakeXdsClient extends XdsClient { private static class FakeXdsClient extends XdsClient {
protected Map<ResourceType, Map<String, ResourceMetadata>> getSubscribedResourcesMetadata() { protected Map<XdsResourceType<?>, Map<String, ResourceMetadata>>
getSubscribedResourcesMetadata() {
return ImmutableMap.of(); return ImmutableMap.of();
} }
@Override @Override
ListenableFuture<Map<ResourceType, Map<String, ResourceMetadata>>> ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
getSubscribedResourcesMetadataSnapshot() { getSubscribedResourcesMetadataSnapshot() {
return Futures.immediateFuture(getSubscribedResourcesMetadata()); return Futures.immediateFuture(getSubscribedResourcesMetadata());
} }

View File

@ -115,7 +115,6 @@ import io.grpc.lookup.v1.GrpcKeyBuilder.Name;
import io.grpc.lookup.v1.NameMatcher; import io.grpc.lookup.v1.NameMatcher;
import io.grpc.lookup.v1.RouteLookupClusterSpecifier; import io.grpc.lookup.v1.RouteLookupClusterSpecifier;
import io.grpc.lookup.v1.RouteLookupConfig; import io.grpc.lookup.v1.RouteLookupConfig;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.ClusterSpecifierPlugin.NamedPluginConfig; import io.grpc.xds.ClusterSpecifierPlugin.NamedPluginConfig;
import io.grpc.xds.ClusterSpecifierPlugin.PluginConfig; import io.grpc.xds.ClusterSpecifierPlugin.PluginConfig;
@ -2699,21 +2698,28 @@ public class XdsClientImplDataTest {
@Test @Test
public void validateResourceName() { public void validateResourceName() {
String traditionalResource = "cluster1.google.com"; String traditionalResource = "cluster1.google.com";
assertThat(XdsClient.isResourceNameValid(traditionalResource, ResourceType.CDS.typeUrl())) assertThat(XdsClient.isResourceNameValid(traditionalResource,
XdsClusterResource.getInstance().typeUrl()))
.isTrue(); .isTrue();
assertThat(XdsClient.isResourceNameValid(traditionalResource, ResourceType.RDS.typeUrlV2())) assertThat(XdsClient.isResourceNameValid(traditionalResource,
XdsRouteConfigureResource.getInstance().typeUrlV2()))
.isTrue(); .isTrue();
String invalidPath = "xdstp:/abc/efg"; String invalidPath = "xdstp:/abc/efg";
assertThat(XdsClient.isResourceNameValid(invalidPath, ResourceType.CDS.typeUrl())).isFalse(); assertThat(XdsClient.isResourceNameValid(invalidPath,
XdsClusterResource.getInstance().typeUrl())).isFalse();
String invalidPath2 = "xdstp:///envoy.config.route.v3.RouteConfiguration"; String invalidPath2 = "xdstp:///envoy.config.route.v3.RouteConfiguration";
assertThat(XdsClient.isResourceNameValid(invalidPath2, ResourceType.RDS.typeUrl())).isFalse(); assertThat(XdsClient.isResourceNameValid(invalidPath2,
XdsRouteConfigureResource.getInstance().typeUrl())).isFalse();
String typeMatch = "xdstp:///envoy.config.route.v3.RouteConfiguration/foo/route1"; String typeMatch = "xdstp:///envoy.config.route.v3.RouteConfiguration/foo/route1";
assertThat(XdsClient.isResourceNameValid(typeMatch, ResourceType.LDS.typeUrl())).isFalse(); assertThat(XdsClient.isResourceNameValid(typeMatch,
assertThat(XdsClient.isResourceNameValid(typeMatch, ResourceType.RDS.typeUrl())).isTrue(); XdsListenerResource.getInstance().typeUrl())).isFalse();
assertThat(XdsClient.isResourceNameValid(typeMatch, ResourceType.RDS.typeUrlV2())).isFalse(); assertThat(XdsClient.isResourceNameValid(typeMatch,
XdsRouteConfigureResource.getInstance().typeUrl())).isTrue();
assertThat(XdsClient.isResourceNameValid(typeMatch,
XdsRouteConfigureResource.getInstance().typeUrlV2())).isFalse();
} }
@Test @Test

View File

@ -18,10 +18,6 @@ package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage; import static com.google.common.truth.Truth.assertWithMessage;
import static io.grpc.xds.AbstractXdsClient.ResourceType.CDS;
import static io.grpc.xds.AbstractXdsClient.ResourceType.EDS;
import static io.grpc.xds.AbstractXdsClient.ResourceType.LDS;
import static io.grpc.xds.AbstractXdsClient.ResourceType.RDS;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA; import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -66,7 +62,6 @@ import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.LbConfig; import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.internal.TimeProvider; import io.grpc.internal.TimeProvider;
import io.grpc.testing.GrpcCleanupRule; import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import io.grpc.xds.Bootstrapper.AuthorityInfo; import io.grpc.xds.Bootstrapper.AuthorityInfo;
import io.grpc.xds.Bootstrapper.CertificateProviderInfo; import io.grpc.xds.Bootstrapper.CertificateProviderInfo;
import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.Bootstrapper.ServerInfo;
@ -137,6 +132,10 @@ public abstract class XdsClientImplTestBase {
private static final Node NODE = Node.newBuilder().setId(NODE_ID).build(); private static final Node NODE = Node.newBuilder().setId(NODE_ID).build();
private static final Any FAILING_ANY = MessageFactory.FAILING_ANY; private static final Any FAILING_ANY = MessageFactory.FAILING_ANY;
private static final ChannelCredentials CHANNEL_CREDENTIALS = InsecureChannelCredentials.create(); private static final ChannelCredentials CHANNEL_CREDENTIALS = InsecureChannelCredentials.create();
private static final XdsResourceType<?> LDS = XdsListenerResource.getInstance();
private static final XdsResourceType<?> CDS = XdsClusterResource.getInstance();
private static final XdsResourceType<?> RDS = XdsRouteConfigureResource.getInstance();
private static final XdsResourceType<?> EDS = XdsEndpointResource.getInstance();
// xDS control plane server info. // xDS control plane server info.
private ServerInfo xdsServerInfo; private ServerInfo xdsServerInfo;
@ -399,15 +398,27 @@ public abstract class XdsClientImplTestBase {
private void verifySubscribedResourcesMetadataSizes( private void verifySubscribedResourcesMetadataSizes(
int ldsSize, int cdsSize, int rdsSize, int edsSize) { int ldsSize, int cdsSize, int rdsSize, int edsSize) {
Map<ResourceType, Map<String, ResourceMetadata>> subscribedResourcesMetadata = Map<XdsResourceType<?>, Map<String, ResourceMetadata>> subscribedResourcesMetadata =
awaitSubscribedResourcesMetadata(); awaitSubscribedResourcesMetadata();
assertThat(subscribedResourcesMetadata.get(LDS)).hasSize(ldsSize); verifyResourceCount(subscribedResourcesMetadata, LDS, ldsSize);
assertThat(subscribedResourcesMetadata.get(CDS)).hasSize(cdsSize); verifyResourceCount(subscribedResourcesMetadata, CDS, cdsSize);
assertThat(subscribedResourcesMetadata.get(RDS)).hasSize(rdsSize); verifyResourceCount(subscribedResourcesMetadata, RDS, rdsSize);
assertThat(subscribedResourcesMetadata.get(EDS)).hasSize(edsSize); verifyResourceCount(subscribedResourcesMetadata, EDS, edsSize);
} }
private Map<ResourceType, Map<String, ResourceMetadata>> awaitSubscribedResourcesMetadata() { private void verifyResourceCount(
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> subscribedResourcesMetadata,
XdsResourceType<?> type,
int size) {
if (size == 0) {
assertThat(subscribedResourcesMetadata.containsKey(type)).isFalse();
} else {
assertThat(subscribedResourcesMetadata.get(type)).hasSize(size);
}
}
private Map<XdsResourceType<?>, Map<String, ResourceMetadata>>
awaitSubscribedResourcesMetadata() {
try { try {
return xdsClient.getSubscribedResourcesMetadataSnapshot().get(20, TimeUnit.SECONDS); return xdsClient.getSubscribedResourcesMetadataSnapshot().get(20, TimeUnit.SECONDS);
} catch (Exception e) { } catch (Exception e) {
@ -419,20 +430,20 @@ public abstract class XdsClientImplTestBase {
} }
/** Verify the resource requested, but not updated. */ /** Verify the resource requested, but not updated. */
private void verifyResourceMetadataRequested(ResourceType type, String resourceName) { private void verifyResourceMetadataRequested(XdsResourceType<?> type, String resourceName) {
verifyResourceMetadata( verifyResourceMetadata(
type, resourceName, null, ResourceMetadataStatus.REQUESTED, "", 0, false); type, resourceName, null, ResourceMetadataStatus.REQUESTED, "", 0, false);
} }
/** Verify that the requested resource does not exist. */ /** Verify that the requested resource does not exist. */
private void verifyResourceMetadataDoesNotExist(ResourceType type, String resourceName) { private void verifyResourceMetadataDoesNotExist(XdsResourceType<?> type, String resourceName) {
verifyResourceMetadata( verifyResourceMetadata(
type, resourceName, null, ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, false); type, resourceName, null, ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, false);
} }
/** Verify the resource to be acked. */ /** Verify the resource to be acked. */
private void verifyResourceMetadataAcked( private void verifyResourceMetadataAcked(
ResourceType type, String resourceName, Any rawResource, String versionInfo, XdsResourceType<?> type, String resourceName, Any rawResource, String versionInfo,
long updateTimeNanos) { long updateTimeNanos) {
verifyResourceMetadata(type, resourceName, rawResource, ResourceMetadataStatus.ACKED, verifyResourceMetadata(type, resourceName, rawResource, ResourceMetadataStatus.ACKED,
versionInfo, updateTimeNanos, false); versionInfo, updateTimeNanos, false);
@ -443,7 +454,7 @@ public abstract class XdsClientImplTestBase {
* corresponding i-th element of {@code List<String> failedDetails}. * corresponding i-th element of {@code List<String> failedDetails}.
*/ */
private void verifyResourceMetadataNacked( private void verifyResourceMetadataNacked(
ResourceType type, String resourceName, Any rawResource, String versionInfo, XdsResourceType<?> type, String resourceName, Any rawResource, String versionInfo,
long updateTime, String failedVersion, long failedUpdateTimeNanos, long updateTime, String failedVersion, long failedUpdateTimeNanos,
List<String> failedDetails) { List<String> failedDetails) {
ResourceMetadata resourceMetadata = ResourceMetadata resourceMetadata =
@ -465,7 +476,7 @@ public abstract class XdsClientImplTestBase {
} }
private ResourceMetadata verifyResourceMetadata( private ResourceMetadata verifyResourceMetadata(
ResourceType type, String resourceName, Any rawResource, ResourceMetadataStatus status, XdsResourceType<?> type, String resourceName, Any rawResource, ResourceMetadataStatus status,
String versionInfo, long updateTimeNanos, boolean hasErrorState) { String versionInfo, long updateTimeNanos, boolean hasErrorState) {
ResourceMetadata metadata = awaitSubscribedResourcesMetadata().get(type).get(resourceName); ResourceMetadata metadata = awaitSubscribedResourcesMetadata().get(type).get(resourceName);
assertThat(metadata).isNotNull(); assertThat(metadata).isNotNull();
@ -1297,7 +1308,7 @@ public abstract class XdsClientImplTestBase {
RDS_RESOURCE, rdsResourceWatcher); RDS_RESOURCE, rdsResourceWatcher);
Any routeConfig = Any.pack(mf.buildRouteConfiguration("route-bar.googleapis.com", Any routeConfig = Any.pack(mf.buildRouteConfiguration("route-bar.googleapis.com",
mf.buildOpaqueVirtualHosts(2))); mf.buildOpaqueVirtualHosts(2)));
call.sendResponse(ResourceType.RDS, routeConfig, VERSION_1, "0000"); call.sendResponse(RDS, routeConfig, VERSION_1, "0000");
// Client sends an ACK RDS request. // Client sends an ACK RDS request.
call.verifyRequest(RDS, RDS_RESOURCE, VERSION_1, "0000", NODE); call.verifyRequest(RDS, RDS_RESOURCE, VERSION_1, "0000", NODE);
@ -1934,7 +1945,7 @@ public abstract class XdsClientImplTestBase {
mf.buildEdsCluster(CDS_RESOURCE, null, "least_request_experimental", null, mf.buildEdsCluster(CDS_RESOURCE, null, "least_request_experimental", null,
leastRequestConfig, false, null, "envoy.transport_sockets.tls", null, null leastRequestConfig, false, null, "envoy.transport_sockets.tls", null, null
)); ));
call.sendResponse(ResourceType.CDS, clusterRingHash, VERSION_1, "0000"); call.sendResponse(CDS, clusterRingHash, VERSION_1, "0000");
// Client sent an ACK CDS request. // Client sent an ACK CDS request.
call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE); call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE);
@ -1966,7 +1977,7 @@ public abstract class XdsClientImplTestBase {
mf.buildEdsCluster(CDS_RESOURCE, null, "ring_hash_experimental", ringHashConfig, null, mf.buildEdsCluster(CDS_RESOURCE, null, "ring_hash_experimental", ringHashConfig, null,
false, null, "envoy.transport_sockets.tls", null, null false, null, "envoy.transport_sockets.tls", null, null
)); ));
call.sendResponse(ResourceType.CDS, clusterRingHash, VERSION_1, "0000"); call.sendResponse(CDS, clusterRingHash, VERSION_1, "0000");
// Client sent an ACK CDS request. // Client sent an ACK CDS request.
call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE); call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE);
@ -3353,10 +3364,9 @@ public abstract class XdsClientImplTestBase {
Message listener = Message listener =
mf.buildListenerWithFilterChain(LISTENER_RESOURCE, 7000, "0.0.0.0", filterChain); mf.buildListenerWithFilterChain(LISTENER_RESOURCE, 7000, "0.0.0.0", filterChain);
List<Any> listeners = ImmutableList.of(Any.pack(listener)); List<Any> listeners = ImmutableList.of(Any.pack(listener));
call.sendResponse(ResourceType.LDS, listeners, "0", "0000"); call.sendResponse(LDS, listeners, "0", "0000");
// Client sends an ACK LDS request. // Client sends an ACK LDS request.
call.verifyRequest( call.verifyRequest(LDS, Collections.singletonList(LISTENER_RESOURCE), "0", "0000", NODE);
ResourceType.LDS, Collections.singletonList(LISTENER_RESOURCE), "0", "0000", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
EnvoyServerProtoData.Listener parsedListener = ldsUpdateCaptor.getValue().listener(); EnvoyServerProtoData.Listener parsedListener = ldsUpdateCaptor.getValue().listener();
assertThat(parsedListener.name()).isEqualTo(LISTENER_RESOURCE); assertThat(parsedListener.name()).isEqualTo(LISTENER_RESOURCE);
@ -3390,10 +3400,9 @@ public abstract class XdsClientImplTestBase {
Message listener = mf.buildListenerWithFilterChain( Message listener = mf.buildListenerWithFilterChain(
"grpc/server?xds.resource.listening_address=0.0.0.0:8000", 7000, "0.0.0.0", filterChain); "grpc/server?xds.resource.listening_address=0.0.0.0:8000", 7000, "0.0.0.0", filterChain);
List<Any> listeners = ImmutableList.of(Any.pack(listener)); List<Any> listeners = ImmutableList.of(Any.pack(listener));
call.sendResponse(ResourceType.LDS, listeners, "0", "0000"); call.sendResponse(LDS, listeners, "0", "0000");
// Client sends an ACK LDS request. // Client sends an ACK LDS request.
call.verifyRequest( call.verifyRequest(LDS, Collections.singletonList(LISTENER_RESOURCE), "0", "0000", NODE);
ResourceType.LDS, Collections.singletonList(LISTENER_RESOURCE), "0", "0000", NODE);
verifyNoInteractions(ldsResourceWatcher); verifyNoInteractions(ldsResourceWatcher);
fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
@ -3418,7 +3427,7 @@ public abstract class XdsClientImplTestBase {
Message listener = Message listener =
mf.buildListenerWithFilterChain(LISTENER_RESOURCE, 7000, "0.0.0.0", filterChain); mf.buildListenerWithFilterChain(LISTENER_RESOURCE, 7000, "0.0.0.0", filterChain);
List<Any> listeners = ImmutableList.of(Any.pack(listener)); List<Any> listeners = ImmutableList.of(Any.pack(listener));
call.sendResponse(ResourceType.LDS, listeners, "0", "0000"); call.sendResponse(LDS, listeners, "0", "0000");
// The response NACKed with errors indicating indices of the failed resources. // The response NACKed with errors indicating indices of the failed resources.
String errorMsg = "LDS response Listener \'grpc/server?xds.resource.listening_address=" String errorMsg = "LDS response Listener \'grpc/server?xds.resource.listening_address="
+ "0.0.0.0:7000\' validation error: " + "0.0.0.0:7000\' validation error: "
@ -3445,7 +3454,7 @@ public abstract class XdsClientImplTestBase {
Message listener = Message listener =
mf.buildListenerWithFilterChain(LISTENER_RESOURCE, 7000, "0.0.0.0", filterChain); mf.buildListenerWithFilterChain(LISTENER_RESOURCE, 7000, "0.0.0.0", filterChain);
List<Any> listeners = ImmutableList.of(Any.pack(listener)); List<Any> listeners = ImmutableList.of(Any.pack(listener));
call.sendResponse(ResourceType.LDS, listeners, "0", "0000"); call.sendResponse(LDS, listeners, "0", "0000");
// The response NACKed with errors indicating indices of the failed resources. // The response NACKed with errors indicating indices of the failed resources.
String errorMsg = "LDS response Listener \'grpc/server?xds.resource.listening_address=" String errorMsg = "LDS response Listener \'grpc/server?xds.resource.listening_address="
+ "0.0.0.0:7000\' validation error: " + "0.0.0.0:7000\' validation error: "
@ -3460,28 +3469,27 @@ public abstract class XdsClientImplTestBase {
XdsResourceType<T> type, String name, ResourceWatcher<T> watcher) { XdsResourceType<T> type, String name, ResourceWatcher<T> watcher) {
FakeClock.TaskFilter timeoutTaskFilter; FakeClock.TaskFilter timeoutTaskFilter;
switch (type.typeName()) { switch (type.typeName()) {
case LDS: case "LDS":
timeoutTaskFilter = LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; timeoutTaskFilter = LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER;
xdsClient.watchXdsResource(type, name, watcher); xdsClient.watchXdsResource(type, name, watcher);
break; break;
case RDS: case "RDS":
timeoutTaskFilter = RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; timeoutTaskFilter = RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER;
xdsClient.watchXdsResource(type, name, watcher); xdsClient.watchXdsResource(type, name, watcher);
break; break;
case CDS: case "CDS":
timeoutTaskFilter = CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; timeoutTaskFilter = CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER;
xdsClient.watchXdsResource(type, name, watcher); xdsClient.watchXdsResource(type, name, watcher);
break; break;
case EDS: case "EDS":
timeoutTaskFilter = EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; timeoutTaskFilter = EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER;
xdsClient.watchXdsResource(type, name, watcher); xdsClient.watchXdsResource(type, name, watcher);
break; break;
case UNKNOWN:
default: default:
throw new AssertionError("should never be here"); throw new AssertionError("should never be here");
} }
DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
call.verifyRequest(type.typeName(), Collections.singletonList(name), "", "", NODE); call.verifyRequest(type, Collections.singletonList(name), "", "", NODE);
ScheduledTask timeoutTask = ScheduledTask timeoutTask =
Iterables.getOnlyElement(fakeClock.getPendingTasks(timeoutTaskFilter)); Iterables.getOnlyElement(fakeClock.getPendingTasks(timeoutTaskFilter));
assertThat(timeoutTask.getDelay(TimeUnit.SECONDS)) assertThat(timeoutTask.getDelay(TimeUnit.SECONDS))
@ -3492,19 +3500,20 @@ public abstract class XdsClientImplTestBase {
protected abstract static class DiscoveryRpcCall { protected abstract static class DiscoveryRpcCall {
protected abstract void verifyRequest( protected abstract void verifyRequest(
ResourceType type, List<String> resources, String versionInfo, String nonce, Node node); XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
Node node);
protected void verifyRequest( protected void verifyRequest(
ResourceType type, String resource, String versionInfo, String nonce, Node node) { XdsResourceType<?> type, String resource, String versionInfo, String nonce, Node node) {
verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node); verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node);
} }
protected abstract void verifyRequestNack( protected abstract void verifyRequestNack(
ResourceType type, List<String> resources, String versionInfo, String nonce, Node node, XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
List<String> errorMessages); Node node, List<String> errorMessages);
protected void verifyRequestNack( protected void verifyRequestNack(
ResourceType type, String resource, String versionInfo, String nonce, Node node, XdsResourceType<?> type, String resource, String versionInfo, String nonce, Node node,
List<String> errorMessages) { List<String> errorMessages) {
verifyRequestNack(type, ImmutableList.of(resource), versionInfo, nonce, node, errorMessages); verifyRequestNack(type, ImmutableList.of(resource), versionInfo, nonce, node, errorMessages);
} }
@ -3512,9 +3521,10 @@ public abstract class XdsClientImplTestBase {
protected abstract void verifyNoMoreRequest(); protected abstract void verifyNoMoreRequest();
protected abstract void sendResponse( protected abstract void sendResponse(
ResourceType type, List<Any> resources, String versionInfo, String nonce); XdsResourceType<?> type, List<Any> resources, String versionInfo, String nonce);
protected void sendResponse(ResourceType type, Any resource, String versionInfo, String nonce) { protected void sendResponse(XdsResourceType<?> type, Any resource, String versionInfo,
String nonce) {
sendResponse(type, ImmutableList.of(resource), versionInfo, nonce); sendResponse(type, ImmutableList.of(resource), versionInfo, nonce);
} }

View File

@ -92,7 +92,6 @@ import io.grpc.Context;
import io.grpc.Context.CancellationListener; import io.grpc.Context.CancellationListener;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -197,7 +196,7 @@ public class XdsClientImplV2Test extends XdsClientImplTestBase {
@Override @Override
protected void verifyRequest( protected void verifyRequest(
ResourceType type, List<String> resources, String versionInfo, String nonce, XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
EnvoyProtoData.Node node) { EnvoyProtoData.Node node) {
verify(requestObserver).onNext(argThat(new DiscoveryRequestMatcher( verify(requestObserver).onNext(argThat(new DiscoveryRequestMatcher(
node.toEnvoyProtoNodeV2(), versionInfo, resources, type.typeUrlV2(), nonce, null, null))); node.toEnvoyProtoNodeV2(), versionInfo, resources, type.typeUrlV2(), nonce, null, null)));
@ -205,7 +204,7 @@ public class XdsClientImplV2Test extends XdsClientImplTestBase {
@Override @Override
protected void verifyRequestNack( protected void verifyRequestNack(
ResourceType type, List<String> resources, String versionInfo, String nonce, XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
EnvoyProtoData.Node node, List<String> errorMessages) { EnvoyProtoData.Node node, List<String> errorMessages) {
verify(requestObserver).onNext(argThat(new DiscoveryRequestMatcher( verify(requestObserver).onNext(argThat(new DiscoveryRequestMatcher(
node.toEnvoyProtoNodeV2(), versionInfo, resources, type.typeUrlV2(), nonce, node.toEnvoyProtoNodeV2(), versionInfo, resources, type.typeUrlV2(), nonce,
@ -219,7 +218,7 @@ public class XdsClientImplV2Test extends XdsClientImplTestBase {
@Override @Override
protected void sendResponse( protected void sendResponse(
ResourceType type, List<Any> resources, String versionInfo, String nonce) { XdsResourceType<?> type, List<Any> resources, String versionInfo, String nonce) {
DiscoveryResponse response = DiscoveryResponse response =
DiscoveryResponse.newBuilder() DiscoveryResponse.newBuilder()
.setVersionInfo(versionInfo) .setVersionInfo(versionInfo)

View File

@ -99,7 +99,6 @@ import io.grpc.Context;
import io.grpc.Context.CancellationListener; import io.grpc.Context.CancellationListener;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -205,7 +204,7 @@ public class XdsClientImplV3Test extends XdsClientImplTestBase {
@Override @Override
protected void verifyRequest( protected void verifyRequest(
ResourceType type, List<String> resources, String versionInfo, String nonce, XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
EnvoyProtoData.Node node) { EnvoyProtoData.Node node) {
verify(requestObserver).onNext(argThat(new DiscoveryRequestMatcher( verify(requestObserver).onNext(argThat(new DiscoveryRequestMatcher(
node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce, null, null))); node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce, null, null)));
@ -213,7 +212,7 @@ public class XdsClientImplV3Test extends XdsClientImplTestBase {
@Override @Override
protected void verifyRequestNack( protected void verifyRequestNack(
ResourceType type, List<String> resources, String versionInfo, String nonce, XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
EnvoyProtoData.Node node, List<String> errorMessages) { EnvoyProtoData.Node node, List<String> errorMessages) {
verify(requestObserver).onNext(argThat(new DiscoveryRequestMatcher( verify(requestObserver).onNext(argThat(new DiscoveryRequestMatcher(
node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce, node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce,
@ -227,7 +226,7 @@ public class XdsClientImplV3Test extends XdsClientImplTestBase {
@Override @Override
protected void sendResponse( protected void sendResponse(
ResourceType type, List<Any> resources, String versionInfo, String nonce) { XdsResourceType<?> type, List<Any> resources, String versionInfo, String nonce) {
DiscoveryResponse response = DiscoveryResponse response =
DiscoveryResponse.newBuilder() DiscoveryResponse.newBuilder()
.setVersionInfo(versionInfo) .setVersionInfo(versionInfo)

View File

@ -2089,14 +2089,14 @@ public class XdsNameResolverTest {
ResourceWatcher<T> watcher) { ResourceWatcher<T> watcher) {
switch (resourceType.typeName()) { switch (resourceType.typeName()) {
case LDS: case "LDS":
assertThat(ldsResource).isNull(); assertThat(ldsResource).isNull();
assertThat(ldsWatcher).isNull(); assertThat(ldsWatcher).isNull();
assertThat(resourceName).isEqualTo(expectedLdsResourceName); assertThat(resourceName).isEqualTo(expectedLdsResourceName);
ldsResource = resourceName; ldsResource = resourceName;
ldsWatcher = (ResourceWatcher<LdsUpdate>) watcher; ldsWatcher = (ResourceWatcher<LdsUpdate>) watcher;
break; break;
case RDS: case "RDS":
assertThat(rdsResource).isNull(); assertThat(rdsResource).isNull();
assertThat(rdsWatcher).isNull(); assertThat(rdsWatcher).isNull();
rdsResource = resourceName; rdsResource = resourceName;
@ -2111,14 +2111,14 @@ public class XdsNameResolverTest {
String resourceName, String resourceName,
ResourceWatcher<T> watcher) { ResourceWatcher<T> watcher) {
switch (type.typeName()) { switch (type.typeName()) {
case LDS: case "LDS":
assertThat(ldsResource).isNotNull(); assertThat(ldsResource).isNotNull();
assertThat(ldsWatcher).isNotNull(); assertThat(ldsWatcher).isNotNull();
assertThat(resourceName).isEqualTo(expectedLdsResourceName); assertThat(resourceName).isEqualTo(expectedLdsResourceName);
ldsResource = null; ldsResource = null;
ldsWatcher = null; ldsWatcher = null;
break; break;
case RDS: case "RDS":
assertThat(rdsResource).isNotNull(); assertThat(rdsResource).isNotNull();
assertThat(rdsWatcher).isNotNull(); assertThat(rdsWatcher).isNotNull();
rdsResource = null; rdsResource = null;

View File

@ -184,12 +184,12 @@ public class XdsServerTestHelper {
String resourceName, String resourceName,
ResourceWatcher<T> watcher) { ResourceWatcher<T> watcher) {
switch (resourceType.typeName()) { switch (resourceType.typeName()) {
case LDS: case "LDS":
assertThat(ldsWatcher).isNull(); assertThat(ldsWatcher).isNull();
ldsWatcher = (ResourceWatcher<LdsUpdate>) watcher; ldsWatcher = (ResourceWatcher<LdsUpdate>) watcher;
ldsResource.set(resourceName); ldsResource.set(resourceName);
break; break;
case RDS: case "RDS":
//re-register is not allowed. //re-register is not allowed.
assertThat(rdsWatchers.put(resourceName, (ResourceWatcher<RdsUpdate>)watcher)).isNull(); assertThat(rdsWatchers.put(resourceName, (ResourceWatcher<RdsUpdate>)watcher)).isNull();
rdsCount.countDown(); rdsCount.countDown();
@ -203,12 +203,12 @@ public class XdsServerTestHelper {
String resourceName, String resourceName,
ResourceWatcher<T> watcher) { ResourceWatcher<T> watcher) {
switch (type.typeName()) { switch (type.typeName()) {
case LDS: case "LDS":
assertThat(ldsWatcher).isNotNull(); assertThat(ldsWatcher).isNotNull();
ldsResource = null; ldsResource = null;
ldsWatcher = null; ldsWatcher = null;
break; break;
case RDS: case "RDS":
rdsWatchers.remove(resourceName); rdsWatchers.remove(resourceName);
break; break;
default: default: