xds: drop xds v2 support (#9760)

This commit is contained in:
yifeizhuang 2022-12-28 10:35:44 -08:00 committed by GitHub
parent f5e8459748
commit 3c5c2be712
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 195 additions and 1176 deletions

View File

@ -227,11 +227,7 @@ final class AbstractXdsClient {
// Must be synchronized.
private void startRpcStream() {
checkState(adsStream == null, "Previous adsStream has not been cleared yet");
if (serverInfo.useProtocolV3()) {
adsStream = new AdsStreamV3();
} else {
adsStream = new AdsStreamV2();
}
adsStream = new AdsStreamV3();
Context prevContext = context.attach();
try {
adsStream.start();
@ -364,102 +360,6 @@ final class AbstractXdsClient {
}
}
private final class AdsStreamV2 extends AbstractAdsStream {
private StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryRequest> requestWriter;
@Override
public boolean isReady() {
return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
}
@Override
void start() {
io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc
.AggregatedDiscoveryServiceStub stub =
io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.newStub(channel);
StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryResponse> responseReaderV2 =
new ClientResponseObserver<io.envoyproxy.envoy.api.v2.DiscoveryRequest,
io.envoyproxy.envoy.api.v2.DiscoveryResponse>() {
@Override
public void beforeStart(
ClientCallStreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryRequest> reqStream) {
reqStream.setOnReadyHandler(AbstractXdsClient.this::readyHandler);
}
@Override
public void onNext(final io.envoyproxy.envoy.api.v2.DiscoveryResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(
XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
MessagePrinter.print(response));
}
handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
response.getNonce());
}
});
}
@Override
public void onError(final Throwable t) {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcError(t);
}
});
}
@Override
public void onCompleted() {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcCompleted();
}
});
}
};
requestWriter = stub.withWaitForReady().streamAggregatedResources(responseReaderV2);
}
@Override
void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
Collection<String> resources, String nonce,
@Nullable String errorDetail) {
checkState(requestWriter != null, "ADS stream has not been started");
io.envoyproxy.envoy.api.v2.DiscoveryRequest.Builder builder =
io.envoyproxy.envoy.api.v2.DiscoveryRequest.newBuilder()
.setVersionInfo(versionInfo)
.setNode(bootstrapNode.toEnvoyProtoNodeV2())
.addAllResourceNames(resources)
.setTypeUrl(type.typeUrlV2())
.setResponseNonce(nonce);
if (errorDetail != null) {
com.google.rpc.Status error =
com.google.rpc.Status.newBuilder()
.setCode(Code.INVALID_ARGUMENT_VALUE) // FIXME(chengyuanzhang): use correct code
.setMessage(errorDetail)
.build();
builder.setErrorDetail(error);
}
io.envoyproxy.envoy.api.v2.DiscoveryRequest request = builder.build();
requestWriter.onNext(request);
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", MessagePrinter.print(request));
}
}
@Override
void sendError(Exception error) {
requestWriter.onError(error);
}
}
private final class AdsStreamV3 extends AbstractAdsStream {
private StreamObserver<DiscoveryRequest> requestWriter;

View File

@ -60,22 +60,19 @@ public abstract class Bootstrapper {
abstract ChannelCredentials channelCredentials();
abstract boolean useProtocolV3();
abstract boolean ignoreResourceDeletion();
@VisibleForTesting
static ServerInfo create(
String target, ChannelCredentials channelCredentials, boolean useProtocolV3) {
return new AutoValue_Bootstrapper_ServerInfo(target, channelCredentials, useProtocolV3,
false);
String target, ChannelCredentials channelCredentials) {
return new AutoValue_Bootstrapper_ServerInfo(target, channelCredentials, false);
}
@VisibleForTesting
static ServerInfo create(
String target, ChannelCredentials channelCredentials, boolean useProtocolV3,
String target, ChannelCredentials channelCredentials,
boolean ignoreResourceDeletion) {
return new AutoValue_Bootstrapper_ServerInfo(target, channelCredentials, useProtocolV3,
return new AutoValue_Bootstrapper_ServerInfo(target, channelCredentials,
ignoreResourceDeletion);
}
}

View File

@ -68,7 +68,6 @@ class BootstrapperImpl extends Bootstrapper {
static final String CLIENT_FEATURE_RESOURCE_IN_SOTW = "xds.config.resource-in-sotw";
// Server features.
private static final String SERVER_FEATURE_XDS_V3 = "xds_v3";
private static final String SERVER_FEATURE_IGNORE_RESOURCE_DELETION = "ignore_resource_deletion";
private final XdsLogger logger;
@ -281,16 +280,14 @@ class BootstrapperImpl extends Bootstrapper {
"Server " + serverUri + ": no supported channel credentials found");
}
boolean useProtocolV3 = false;
boolean ignoreResourceDeletion = false;
List<String> serverFeatures = JsonUtil.getListOfStrings(serverConfig, "server_features");
if (serverFeatures != null) {
logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures);
useProtocolV3 = serverFeatures.contains(SERVER_FEATURE_XDS_V3);
ignoreResourceDeletion = serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION);
}
servers.add(
ServerInfo.create(serverUri, channelCredentials, useProtocolV3, ignoreResourceDeletion));
ServerInfo.create(serverUri, channelCredentials, ignoreResourceDeletion));
}
return servers.build();
}

View File

@ -270,38 +270,6 @@ final class EnvoyProtoData {
builder.addAllClientFeatures(clientFeatures);
return builder.build();
}
@SuppressWarnings("deprecation") // Deprecated v2 API setBuildVersion().
public io.envoyproxy.envoy.api.v2.core.Node toEnvoyProtoNodeV2() {
io.envoyproxy.envoy.api.v2.core.Node.Builder builder =
io.envoyproxy.envoy.api.v2.core.Node.newBuilder();
builder.setId(id);
builder.setCluster(cluster);
if (metadata != null) {
Struct.Builder structBuilder = Struct.newBuilder();
for (Map.Entry<String, ?> entry : metadata.entrySet()) {
structBuilder.putFields(entry.getKey(), convertToValue(entry.getValue()));
}
builder.setMetadata(structBuilder);
}
if (locality != null) {
builder.setLocality(
io.envoyproxy.envoy.api.v2.core.Locality.newBuilder()
.setRegion(locality.region())
.setZone(locality.zone())
.setSubZone(locality.subZone()));
}
for (Address address : listeningAddresses) {
builder.addListeningAddresses(address.toEnvoyProtoAddressV2());
}
builder.setBuildVersion(buildVersion);
builder.setUserAgentName(userAgentName);
if (userAgentVersion != null) {
builder.setUserAgentVersion(userAgentVersion);
}
builder.addAllClientFeatures(clientFeatures);
return builder.build();
}
}
/**

View File

@ -57,7 +57,6 @@ final class LoadReportClient {
private final XdsLogger logger;
private final Channel channel;
private final Context context;
private final boolean useProtocolV3;
private final Node node;
private final SynchronizationContext syncContext;
private final ScheduledExecutorService timerService;
@ -77,7 +76,6 @@ final class LoadReportClient {
LoadStatsManager2 loadStatsManager,
Channel channel,
Context context,
boolean useProtocolV3,
Node node,
SynchronizationContext syncContext,
ScheduledExecutorService scheduledExecutorService,
@ -86,7 +84,6 @@ final class LoadReportClient {
this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager");
this.channel = checkNotNull(channel, "xdsChannel");
this.context = checkNotNull(context, "context");
this.useProtocolV3 = useProtocolV3;
this.syncContext = checkNotNull(syncContext, "syncContext");
this.timerService = checkNotNull(scheduledExecutorService, "timeService");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
@ -161,11 +158,7 @@ final class LoadReportClient {
return;
}
checkState(lrsStream == null, "previous lbStream has not been cleared yet");
if (useProtocolV3) {
lrsStream = new LrsStreamV3();
} else {
lrsStream = new LrsStreamV2();
}
lrsStream = new LrsStreamV3();
retryStopwatch.reset().start();
Context prevContext = context.attach();
try {
@ -175,6 +168,8 @@ final class LoadReportClient {
}
}
// TODO(zivy@): The abstract class was used to support xds v2 and v3. Remove abstract here since
// v2 is dropped and v3 is the only supported version now.
private abstract class LrsStream {
boolean initialResponseReceived;
boolean closed;
@ -298,105 +293,6 @@ final class LoadReportClient {
}
}
private final class LrsStreamV2 extends LrsStream {
StreamObserver<io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest> lrsRequestWriterV2;
@Override
void start() {
StreamObserver<io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse>
lrsResponseReaderV2 =
new StreamObserver<io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse>() {
@Override
public void onNext(
final io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
logger.log(XdsLogLevel.DEBUG, "Received LoadStatsResponse:\n{0}", response);
handleRpcResponse(response.getClustersList(), response.getSendAllClusters(),
Durations.toNanos(response.getLoadReportingInterval()));
}
});
}
@Override
public void onError(final Throwable t) {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcError(t);
}
});
}
@Override
public void onCompleted() {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcCompleted();
}
});
}
};
io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc.LoadReportingServiceStub
stubV2 = io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc.newStub(
channel);
lrsRequestWriterV2 = stubV2.withWaitForReady().streamLoadStats(lrsResponseReaderV2);
logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
}
@Override
void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) {
io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest.Builder requestBuilder =
io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest.newBuilder()
.setNode(node.toEnvoyProtoNodeV2());
for (ClusterStats stats : clusterStatsList) {
requestBuilder.addClusterStats(buildClusterStats(stats));
}
io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest request = requestBuilder.build();
lrsRequestWriterV2.onNext(requestBuilder.build());
logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request);
}
@Override
void sendError(Exception error) {
lrsRequestWriterV2.onError(error);
}
private io.envoyproxy.envoy.api.v2.endpoint.ClusterStats buildClusterStats(
ClusterStats stats) {
io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.Builder builder =
io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.newBuilder()
.setClusterName(stats.clusterName());
if (stats.clusterServiceName() != null) {
builder.setClusterServiceName(stats.clusterServiceName());
}
for (UpstreamLocalityStats upstreamLocalityStats : stats.upstreamLocalityStatsList()) {
builder.addUpstreamLocalityStats(
io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats.newBuilder()
.setLocality(
io.envoyproxy.envoy.api.v2.core.Locality.newBuilder()
.setRegion(upstreamLocalityStats.locality().region())
.setZone(upstreamLocalityStats.locality().zone())
.setSubZone(upstreamLocalityStats.locality().subZone()))
.setTotalSuccessfulRequests(upstreamLocalityStats.totalSuccessfulRequests())
.setTotalErrorRequests(upstreamLocalityStats.totalErrorRequests())
.setTotalRequestsInProgress(upstreamLocalityStats.totalRequestsInProgress())
.setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests()));
}
for (DroppedRequests droppedRequests : stats.droppedRequestsList()) {
builder.addDroppedRequests(
io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests.newBuilder()
.setCategory(droppedRequests.category())
.setDroppedCount(droppedRequests.droppedCount()));
}
return builder.setTotalDroppedRequests(stats.totalDroppedRequests())
.setLoadReportInterval(Durations.fromNanos(stats.loadReportIntervalNano())).build();
}
}
private final class LrsStreamV3 extends LrsStream {
StreamObserver<LoadStatsRequest> lrsRequestWriterV3;

View File

@ -157,8 +157,8 @@ final class XdsClientImpl extends XdsClient
stopwatchSupplier,
this);
LoadReportClient lrsClient = new LoadReportClient(
loadStatsManager, xdsChannel.channel(), context, serverInfo.useProtocolV3(),
bootstrapInfo.node(), syncContext, timeService, backoffPolicyProvider, stopwatchSupplier);
loadStatsManager, xdsChannel.channel(), context, bootstrapInfo.node(), syncContext,
timeService, backoffPolicyProvider, stopwatchSupplier);
serverChannelMap.put(serverInfo, xdsChannel);
serverLrsClientMap.put(serverInfo, lrsClient);
}
@ -302,7 +302,6 @@ final class XdsClientImpl extends XdsClient
if (!resourceSubscribers.containsKey(type)) {
resourceSubscribers.put(type, new HashMap<>());
subscribedResourceTypeUrls.put(type.typeUrl(), type);
subscribedResourceTypeUrls.put(type.typeUrlV2(), type);
}
ResourceSubscriber<T> subscriber =
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
@ -334,7 +333,6 @@ final class XdsClientImpl extends XdsClient
subscriber.cancelResourceWatch();
resourceSubscribers.get(type).remove(resourceName);
subscribedResourceTypeUrls.remove(type.typeUrl());
subscribedResourceTypeUrls.remove(type.typeUrlV2());
if (subscriber.xdsChannel != null) {
subscriber.xdsChannel.adjustResourceSubscription(type);
}

View File

@ -50,7 +50,6 @@ import java.util.Set;
import javax.annotation.Nullable;
class XdsClusterResource extends XdsResourceType<CdsUpdate> {
static final String ADS_TYPE_URL_CDS_V2 = "type.googleapis.com/envoy.api.v2.Cluster";
static final String ADS_TYPE_URL_CDS =
"type.googleapis.com/envoy.config.cluster.v3.Cluster";
private static final String TYPE_URL_UPSTREAM_TLS_CONTEXT =
@ -83,11 +82,6 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
return ADS_TYPE_URL_CDS;
}
@Override
String typeUrlV2() {
return ADS_TYPE_URL_CDS_V2;
}
@Override
boolean isFullStateOfTheWorld() {
return true;
@ -100,7 +94,7 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
}
@Override
CdsUpdate doParse(Args args, Message unpackedMessage, boolean isResourceV3)
CdsUpdate doParse(Args args, Message unpackedMessage)
throws ResourceInvalidException {
if (!(unpackedMessage instanceof Cluster)) {
throw new ResourceInvalidException("Invalid message type: " + unpackedMessage.getClass());
@ -167,7 +161,7 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
try {
clusterConfig = unpackCompatibleType(customType.getTypedConfig(),
io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig.class,
TYPE_URL_CLUSTER_CONFIG, TYPE_URL_CLUSTER_CONFIG_V2);
TYPE_URL_CLUSTER_CONFIG, null);
} catch (InvalidProtocolBufferException e) {
return StructOrError.fromError("Cluster " + clusterName + ": malformed ClusterConfig: " + e);
}

View File

@ -43,8 +43,6 @@ import java.util.Set;
import javax.annotation.Nullable;
class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
static final String ADS_TYPE_URL_EDS_V2 =
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
static final String ADS_TYPE_URL_EDS =
"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
@ -73,11 +71,6 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
return ADS_TYPE_URL_EDS;
}
@Override
String typeUrlV2() {
return ADS_TYPE_URL_EDS_V2;
}
@Override
boolean isFullStateOfTheWorld() {
return false;
@ -89,7 +82,7 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
}
@Override
EdsUpdate doParse(Args args, Message unpackedMessage, boolean isResourceV3)
EdsUpdate doParse(Args args, Message unpackedMessage)
throws ResourceInvalidException {
if (!(unpackedMessage instanceof ClusterLoadAssignment)) {
throw new ResourceInvalidException("Invalid message type: " + unpackedMessage.getClass());

View File

@ -52,12 +52,8 @@ import java.util.Set;
import javax.annotation.Nullable;
class XdsListenerResource extends XdsResourceType<LdsUpdate> {
static final String ADS_TYPE_URL_LDS_V2 = "type.googleapis.com/envoy.api.v2.Listener";
static final String ADS_TYPE_URL_LDS =
"type.googleapis.com/envoy.config.listener.v3.Listener";
private static final String TYPE_URL_HTTP_CONNECTION_MANAGER_V2 =
"type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2"
+ ".HttpConnectionManager";
static final String TYPE_URL_HTTP_CONNECTION_MANAGER =
"type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3"
+ ".HttpConnectionManager";
@ -92,18 +88,13 @@ class XdsListenerResource extends XdsResourceType<LdsUpdate> {
return ADS_TYPE_URL_LDS;
}
@Override
String typeUrlV2() {
return ADS_TYPE_URL_LDS_V2;
}
@Override
boolean isFullStateOfTheWorld() {
return true;
}
@Override
LdsUpdate doParse(Args args, Message unpackedMessage, boolean isResourceV3)
LdsUpdate doParse(Args args, Message unpackedMessage)
throws ResourceInvalidException {
if (!(unpackedMessage instanceof Listener)) {
throw new ResourceInvalidException("Invalid message type: " + unpackedMessage.getClass());
@ -112,10 +103,10 @@ class XdsListenerResource extends XdsResourceType<LdsUpdate> {
if (listener.hasApiListener()) {
return processClientSideListener(
listener, args, enableFaultInjection && isResourceV3);
listener, args, enableFaultInjection);
} else {
return processServerSideListener(
listener, args, enableRbac && isResourceV3);
listener, args, enableRbac);
}
}
@ -126,7 +117,7 @@ class XdsListenerResource extends XdsResourceType<LdsUpdate> {
try {
hcm = unpackCompatibleType(
listener.getApiListener().getApiListener(), HttpConnectionManager.class,
TYPE_URL_HTTP_CONNECTION_MANAGER, TYPE_URL_HTTP_CONNECTION_MANAGER_V2);
TYPE_URL_HTTP_CONNECTION_MANAGER, null);
} catch (InvalidProtocolBufferException e) {
throw new ResourceInvalidException(
"Could not parse HttpConnectionManager config from ApiListener", e);

View File

@ -202,8 +202,7 @@ final class XdsNameResolver extends NameResolver {
}
String ldsResourceName = expandPercentS(listenerNameTemplate, replacement);
if (!XdsClient.isResourceNameValid(ldsResourceName, XdsListenerResource.getInstance().typeUrl())
&& !XdsClient.isResourceNameValid(ldsResourceName,
XdsListenerResource.getInstance().typeUrlV2())) {
) {
listener.onError(Status.INVALID_ARGUMENT.withDescription(
"invalid listener resource URI for service authority: " + serviceAuthority));
return;

View File

@ -39,8 +39,7 @@ import java.util.Set;
import javax.annotation.Nullable;
abstract class XdsResourceType<T extends ResourceUpdate> {
static final String TYPE_URL_RESOURCE_V2 = "type.googleapis.com/envoy.api.v2.Resource";
static final String TYPE_URL_RESOURCE_V3 =
static final String TYPE_URL_RESOURCE =
"type.googleapis.com/envoy.service.discovery.v3.Resource";
static final String TRANSPORT_SOCKET_NAME_TLS = "envoy.transport_sockets.tls";
@VisibleForTesting
@ -65,8 +64,6 @@ abstract class XdsResourceType<T extends ResourceUpdate> {
@VisibleForTesting
static boolean enableOutlierDetection = getFlag("GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION",
true);
static final String TYPE_URL_CLUSTER_CONFIG_V2 =
"type.googleapis.com/envoy.config.cluster.aggregate.v2alpha.ClusterConfig";
static final String TYPE_URL_CLUSTER_CONFIG =
"type.googleapis.com/envoy.extensions.clusters.aggregate.v3.ClusterConfig";
static final String TYPE_URL_TYPED_STRUCT_UDPA =
@ -83,8 +80,6 @@ abstract class XdsResourceType<T extends ResourceUpdate> {
abstract String typeUrl();
abstract String typeUrlV2();
// Do not confuse with the SotW approach: it is the mechanism in which the client must specify all
// resource names it is interested in with each request. Different resource types may behave
// differently in this approach. For LDS and CDS resources, the server must return all resources
@ -132,13 +127,10 @@ abstract class XdsResourceType<T extends ResourceUpdate> {
for (int i = 0; i < resources.size(); i++) {
Any resource = resources.get(i);
boolean isResourceV3;
Message unpackedMessage;
try {
resource = maybeUnwrapResources(resource);
isResourceV3 = resource.getTypeUrl().equals(typeUrl());
unpackedMessage = unpackCompatibleType(resource, unpackedClassName(),
typeUrl(), typeUrlV2());
unpackedMessage = unpackCompatibleType(resource, unpackedClassName(), typeUrl(), null);
} catch (InvalidProtocolBufferException e) {
errors.add(String.format("%s response Resource index %d - can't decode %s: %s",
typeName(), i, unpackedClassName().getSimpleName(), e.getMessage()));
@ -158,7 +150,7 @@ abstract class XdsResourceType<T extends ResourceUpdate> {
T resourceUpdate;
try {
resourceUpdate = doParse(args, unpackedMessage, isResourceV3);
resourceUpdate = doParse(args, unpackedMessage);
} catch (XdsClientImpl.ResourceInvalidException e) {
errors.add(String.format("%s response %s '%s' validation error: %s",
typeName(), unpackedClassName().getSimpleName(), cname, e.getMessage()));
@ -174,8 +166,7 @@ abstract class XdsResourceType<T extends ResourceUpdate> {
}
abstract T doParse(Args args, Message unpackedMessage, boolean isResourceV3)
throws ResourceInvalidException;
abstract T doParse(Args args, Message unpackedMessage) throws ResourceInvalidException;
/**
* Helper method to unpack serialized {@link com.google.protobuf.Any} message, while replacing
@ -200,10 +191,9 @@ abstract class XdsResourceType<T extends ResourceUpdate> {
private Any maybeUnwrapResources(Any resource)
throws InvalidProtocolBufferException {
if (resource.getTypeUrl().equals(TYPE_URL_RESOURCE_V2)
|| resource.getTypeUrl().equals(TYPE_URL_RESOURCE_V3)) {
return unpackCompatibleType(resource, Resource.class, TYPE_URL_RESOURCE_V3,
TYPE_URL_RESOURCE_V2).getResource();
if (resource.getTypeUrl().equals(TYPE_URL_RESOURCE)) {
return unpackCompatibleType(resource, Resource.class, TYPE_URL_RESOURCE,
null).getResource();
} else {
return resource;
}

View File

@ -65,8 +65,6 @@ import java.util.Set;
import javax.annotation.Nullable;
class XdsRouteConfigureResource extends XdsResourceType<RdsUpdate> {
static final String ADS_TYPE_URL_RDS_V2 =
"type.googleapis.com/envoy.api.v2.RouteConfiguration";
static final String ADS_TYPE_URL_RDS =
"type.googleapis.com/envoy.config.route.v3.RouteConfiguration";
private static final String TYPE_URL_FILTER_CONFIG =
@ -102,11 +100,6 @@ class XdsRouteConfigureResource extends XdsResourceType<RdsUpdate> {
return ADS_TYPE_URL_RDS;
}
@Override
String typeUrlV2() {
return ADS_TYPE_URL_RDS_V2;
}
@Override
boolean isFullStateOfTheWorld() {
return false;
@ -118,13 +111,13 @@ class XdsRouteConfigureResource extends XdsResourceType<RdsUpdate> {
}
@Override
RdsUpdate doParse(XdsResourceType.Args args, Message unpackedMessage, boolean isResourceV3)
RdsUpdate doParse(XdsResourceType.Args args, Message unpackedMessage)
throws ResourceInvalidException {
if (!(unpackedMessage instanceof RouteConfiguration)) {
throw new ResourceInvalidException("Invalid message type: " + unpackedMessage.getClass());
}
return processRouteConfiguration((RouteConfiguration) unpackedMessage,
args.filterRegistry, enableFaultInjection && isResourceV3);
args.filterRegistry, enableFaultInjection);
}
private static RdsUpdate processRouteConfiguration(

View File

@ -180,9 +180,8 @@ final class XdsServerWrapper extends Server {
return;
}
xdsClient = xdsClientPool.getObject();
boolean useProtocolV3 = xdsClient.getBootstrapInfo().servers().get(0).useProtocolV3();
String listenerTemplate = xdsClient.getBootstrapInfo().serverListenerResourceNameTemplate();
if (!useProtocolV3 || listenerTemplate == null) {
if (listenerTemplate == null) {
StatusException statusException =
Status.UNAVAILABLE.withDescription(
"Can only support xDS v3 with listener resource name template").asException();

View File

@ -579,8 +579,6 @@ public class BootstrapperImplTest {
assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
assertThat(serverInfo.channelCredentials()).isInstanceOf(InsecureChannelCredentials.class);
assertThat(serverInfo.ignoreResourceDeletion()).isFalse();
// xds v2: xds v3 disabled
assertThat(serverInfo.useProtocolV3()).isFalse();
}
@Test
@ -603,8 +601,6 @@ public class BootstrapperImplTest {
assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
assertThat(serverInfo.channelCredentials()).isInstanceOf(InsecureChannelCredentials.class);
assertThat(serverInfo.ignoreResourceDeletion()).isFalse();
// xds_v3 enabled
assertThat(serverInfo.useProtocolV3()).isTrue();
}
@Test
@ -627,7 +623,6 @@ public class BootstrapperImplTest {
assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
assertThat(serverInfo.channelCredentials()).isInstanceOf(InsecureChannelCredentials.class);
// Only ignore_resource_deletion feature enabled: confirm it's on, and xds_v3 is off.
assertThat(serverInfo.useProtocolV3()).isFalse();
assertThat(serverInfo.ignoreResourceDeletion()).isTrue();
}
@ -650,8 +645,7 @@ public class BootstrapperImplTest {
ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
assertThat(serverInfo.channelCredentials()).isInstanceOf(InsecureChannelCredentials.class);
// xds_v3 and ignore_resource_deletion features enabled: confirm both are on.
assertThat(serverInfo.useProtocolV3()).isTrue();
// ignore_resource_deletion features enabled: confirm both are on.
assertThat(serverInfo.ignoreResourceDeletion()).isTrue();
}

View File

@ -84,7 +84,7 @@ public class CdsLoadBalancer2Test {
private static final String EDS_SERVICE_NAME = "backend-service-1.googleapis.com";
private static final String DNS_HOST_NAME = "backend-service-dns.googleapis.com:443";
private static final ServerInfo LRS_SERVER_INFO =
ServerInfo.create("lrs.googleapis.com", InsecureChannelCredentials.create(), true);
ServerInfo.create("lrs.googleapis.com", InsecureChannelCredentials.create());
private final UpstreamTlsContext upstreamTlsContext =
CommonTlsContextTestsUtil.buildUpstreamTlsContext("google_cloud_private_spiffe", true);
private final OutlierDetection outlierDetection = OutlierDetection.create(

View File

@ -88,7 +88,7 @@ public class ClusterImplLoadBalancerTest {
private static final String CLUSTER = "cluster-foo.googleapis.com";
private static final String EDS_SERVICE_NAME = "service.googleapis.com";
private static final ServerInfo LRS_SERVER_INFO =
ServerInfo.create("api.google.com", InsecureChannelCredentials.create(), true);
ServerInfo.create("api.google.com", InsecureChannelCredentials.create());
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override

View File

@ -113,7 +113,7 @@ public class ClusterResolverLoadBalancerTest {
private static final String EDS_SERVICE_NAME2 = "backend-service-bar.googleapis.com";
private static final String DNS_HOST_NAME = "dns-service.googleapis.com";
private static final ServerInfo LRS_SERVER_INFO =
ServerInfo.create("lrs.googleapis.com", InsecureChannelCredentials.create(), true);
ServerInfo.create("lrs.googleapis.com", InsecureChannelCredentials.create());
private final Locality locality1 =
Locality.create("test-region-1", "test-zone-1", "test-subzone-1");
private final Locality locality2 =

View File

@ -73,7 +73,7 @@ public class CsdsServiceTest {
EnvoyProtoData.Node.newBuilder().setId(NODE_ID).build();
private static final BootstrapInfo BOOTSTRAP_INFO = BootstrapInfo.builder()
.servers(ImmutableList.of(
ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create(), true)))
ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create())))
.node(BOOTSTRAP_NODE)
.build();
private static final FakeXdsClient XDS_CLIENT_NO_RESOURCES = new FakeXdsClient();

View File

@ -86,41 +86,6 @@ public class EnvoyProtoDataTest {
.addClientFeatures("feature-2")
.build();
assertThat(node.toEnvoyProtoNode()).isEqualTo(nodeProto);
@SuppressWarnings("deprecation") // Deprecated v2 API setBuildVersion().
io.envoyproxy.envoy.api.v2.core.Node nodeProtoV2 =
io.envoyproxy.envoy.api.v2.core.Node.newBuilder()
.setId("node-id")
.setCluster("cluster")
.setMetadata(Struct.newBuilder()
.putFields("TRAFFICDIRECTOR_INTERCEPTION_PORT",
Value.newBuilder().setStringValue("ENVOY_PORT").build())
.putFields("TRAFFICDIRECTOR_NETWORK_NAME",
Value.newBuilder().setStringValue("VPC_NETWORK_NAME").build()))
.setLocality(
io.envoyproxy.envoy.api.v2.core.Locality.newBuilder()
.setRegion("region")
.setZone("zone")
.setSubZone("subzone"))
.addListeningAddresses(
io.envoyproxy.envoy.api.v2.core.Address.newBuilder()
.setSocketAddress(
io.envoyproxy.envoy.api.v2.core.SocketAddress.newBuilder()
.setAddress("www.foo.com")
.setPortValue(8080)))
.addListeningAddresses(
io.envoyproxy.envoy.api.v2.core.Address.newBuilder()
.setSocketAddress(
io.envoyproxy.envoy.api.v2.core.SocketAddress.newBuilder()
.setAddress("www.bar.com")
.setPortValue(8088)))
.setBuildVersion("v1")
.setUserAgentName("agent")
.setUserAgentVersion("1.1")
.addClientFeatures("feature-1")
.addClientFeatures("feature-2")
.build();
assertThat(node.toEnvoyProtoNodeV2()).isEqualTo(nodeProtoV2);
}
@Test

View File

@ -33,12 +33,12 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.api.v2.core.Node;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats;
import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc;
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse;
import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats;
import io.envoyproxy.envoy.config.endpoint.v3.UpstreamLocalityStats;
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
import io.grpc.Context;
import io.grpc.Context.CancellationListener;
import io.grpc.ManagedChannel;
@ -172,7 +172,7 @@ public class LoadReportClientTest {
when(backoffPolicy2.nextBackoffNanos())
.thenReturn(TimeUnit.SECONDS.toNanos(2L), TimeUnit.SECONDS.toNanos(20L));
addFakeStatsData();
lrsClient = new LoadReportClient(loadStatsManager, channel, Context.ROOT, false, NODE,
lrsClient = new LoadReportClient(loadStatsManager, channel, Context.ROOT, NODE,
syncContext, fakeClock.getScheduledExecutorService(), backoffPolicyProvider,
fakeClock.getStopwatchSupplier());
syncContext.execute(new Runnable() {

View File

@ -67,7 +67,7 @@ public class SharedXdsClientPoolProviderTest {
@Test
public void sharedXdsClientObjectPool() throws XdsInitializationException {
ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create(), false);
ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create());
BootstrapInfo bootstrapInfo =
BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build();
when(bootstrapper.bootstrap()).thenReturn(bootstrapInfo);
@ -84,7 +84,7 @@ public class SharedXdsClientPoolProviderTest {
@Test
public void refCountedXdsClientObjectPool_delayedCreation() {
ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create(), false);
ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create());
BootstrapInfo bootstrapInfo =
BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build();
RefCountedXdsClientObjectPool xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo);
@ -96,7 +96,7 @@ public class SharedXdsClientPoolProviderTest {
@Test
public void refCountedXdsClientObjectPool_refCounted() {
ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create(), false);
ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create());
BootstrapInfo bootstrapInfo =
BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build();
RefCountedXdsClientObjectPool xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo);
@ -115,7 +115,7 @@ public class SharedXdsClientPoolProviderTest {
@Test
public void refCountedXdsClientObjectPool_getObjectCreatesNewInstanceIfAlreadyShutdown() {
ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create(), false);
ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create());
BootstrapInfo bootstrapInfo =
BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build();
RefCountedXdsClientObjectPool xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo);

View File

@ -152,7 +152,7 @@ import org.junit.runners.JUnit4;
public class XdsClientImplDataTest {
private static final ServerInfo LRS_SERVER_INFO =
ServerInfo.create("lrs.googleapis.com", InsecureChannelCredentials.create(), true);
ServerInfo.create("lrs.googleapis.com", InsecureChannelCredentials.create());
@SuppressWarnings("deprecation") // https://github.com/grpc/grpc-java/issues/7467
@Rule
@ -2719,9 +2719,6 @@ public class XdsClientImplDataTest {
assertThat(XdsClient.isResourceNameValid(traditionalResource,
XdsClusterResource.getInstance().typeUrl()))
.isTrue();
assertThat(XdsClient.isResourceNameValid(traditionalResource,
XdsRouteConfigureResource.getInstance().typeUrlV2()))
.isTrue();
String invalidPath = "xdstp:/abc/efg";
assertThat(XdsClient.isResourceNameValid(invalidPath,
@ -2736,8 +2733,6 @@ public class XdsClientImplDataTest {
XdsListenerResource.getInstance().typeUrl())).isFalse();
assertThat(XdsClient.isResourceNameValid(typeMatch,
XdsRouteConfigureResource.getInstance().typeUrl())).isTrue();
assertThat(XdsClient.isResourceNameValid(typeMatch,
XdsRouteConfigureResource.getInstance().typeUrlV2())).isFalse();
}
@Test

View File

@ -32,18 +32,22 @@ import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import com.google.protobuf.Duration;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.UInt32Value;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.config.cluster.v3.OutlierDetection;
import io.envoyproxy.envoy.config.route.v3.FilterConfig;
import io.envoyproxy.envoy.config.route.v3.WeightedCluster;
import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateProviderPluginInstance;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc;
import io.grpc.BindableService;
import io.grpc.ChannelCredentials;
import io.grpc.Context;
@ -63,6 +67,7 @@ import io.grpc.internal.JsonUtil;
import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.internal.TimeProvider;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.Bootstrapper.AuthorityInfo;
import io.grpc.xds.Bootstrapper.CertificateProviderInfo;
@ -119,6 +124,8 @@ import org.mockito.MockitoAnnotations;
* Tests for {@link XdsClientImpl}.
*/
@RunWith(JUnit4.class)
// The base class was used to test both xds v2 and v3. V2 is dropped now so the base class is not
// necessary. Still keep it for future version usage. Remove if too much trouble to maintain.
public abstract class XdsClientImplTestBase {
private static final String SERVER_URI = "trafficdirector.googleapis.com";
private static final String SERVER_URI_CUSTOME_AUTHORITY = "trafficdirector2.googleapis.com";
@ -190,7 +197,10 @@ public abstract class XdsClientImplTestBase {
private final FakeClock fakeClock = new FakeClock();
protected final BlockingDeque<DiscoveryRpcCall> resourceDiscoveryCalls =
new LinkedBlockingDeque<>(1);
protected final BlockingDeque<DiscoveryRpcCall> resourceDiscoveryCallsV2 =
new LinkedBlockingDeque<>(1);
protected final Queue<LrsRpcCall> loadReportCalls = new ArrayDeque<>();
protected final Queue<LrsRpcCall> loadReportCallsV2 = new ArrayDeque<>();
protected final AtomicBoolean adsEnded = new AtomicBoolean(true);
protected final AtomicBoolean lrsEnded = new AtomicBoolean(true);
private final MessageFactory mf = createMessageFactory();
@ -298,7 +308,9 @@ public abstract class XdsClientImplTestBase {
xdsServer = cleanupRule.register(InProcessServerBuilder
.forName(serverName)
.addService(adsService)
.addService(createAdsServiceV2())
.addService(lrsService)
.addService(createLrsServiceV2())
.directExecutor()
.build()
.start());
@ -328,7 +340,7 @@ public abstract class XdsClientImplTestBase {
}
};
xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, useProtocolV3(),
xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS,
ignoreResourceDeletion());
Bootstrapper.BootstrapInfo bootstrapInfo =
Bootstrapper.BootstrapInfo.builder()
@ -339,12 +351,12 @@ public abstract class XdsClientImplTestBase {
AuthorityInfo.create(
"xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s",
ImmutableList.of(Bootstrapper.ServerInfo.create(
SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS, useProtocolV3()))),
SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS))),
"",
AuthorityInfo.create(
"xdstp:///envoy.config.listener.v3.Listener/%s",
ImmutableList.of(Bootstrapper.ServerInfo.create(
SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS, useProtocolV3())))))
SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS)))))
.certProviders(ImmutableMap.of("cert-instance-name",
CertificateProviderInfo.create("file-watcher", ImmutableMap.<String, Object>of())))
.build();
@ -423,11 +435,9 @@ public abstract class XdsClientImplTestBase {
int size) {
if (size == 0) {
assertThat(subscribedTypeUrls.containsKey(type.typeUrl())).isFalse();
assertThat(subscribedTypeUrls.containsKey(type.typeUrlV2())).isFalse();
assertThat(subscribedResourcesMetadata.containsKey(type)).isFalse();
} else {
assertThat(subscribedTypeUrls.containsKey(type.typeUrl())).isTrue();
assertThat(subscribedTypeUrls.containsKey(type.typeUrlV2())).isTrue();
assertThat(subscribedResourcesMetadata.get(type)).hasSize(size);
}
}
@ -3589,18 +3599,22 @@ public abstract class XdsClientImplTestBase {
protected abstract static class DiscoveryRpcCall {
protected abstract void verifyRequest(
protected void verifyRequest(
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
Node node);
Node node) {
throw new UnsupportedOperationException();
}
protected void verifyRequest(
XdsResourceType<?> type, String resource, String versionInfo, String nonce, Node node) {
verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node);
}
protected abstract void verifyRequestNack(
protected void verifyRequestNack(
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
Node node, List<String> errorMessages);
Node node, List<String> errorMessages) {
throw new UnsupportedOperationException();
}
protected void verifyRequestNack(
XdsResourceType<?> type, String resource, String versionInfo, String nonce, Node node,
@ -3608,19 +3622,27 @@ public abstract class XdsClientImplTestBase {
verifyRequestNack(type, ImmutableList.of(resource), versionInfo, nonce, node, errorMessages);
}
protected abstract void verifyNoMoreRequest();
protected void verifyNoMoreRequest() {
throw new UnsupportedOperationException();
}
protected abstract void sendResponse(
XdsResourceType<?> type, List<Any> resources, String versionInfo, String nonce);
protected void sendResponse(
XdsResourceType<?> type, List<Any> resources, String versionInfo, String nonce) {
throw new UnsupportedOperationException();
}
protected void sendResponse(XdsResourceType<?> type, Any resource, String versionInfo,
String nonce) {
sendResponse(type, ImmutableList.of(resource), versionInfo, nonce);
}
protected abstract void sendError(Throwable t);
protected void sendError(Throwable t) {
throw new UnsupportedOperationException();
}
protected abstract void sendCompleted();
protected void sendCompleted() {
throw new UnsupportedOperationException();
}
}
protected abstract static class LrsRpcCall {
@ -3628,9 +3650,13 @@ public abstract class XdsClientImplTestBase {
/**
* Verifies a LRS request has been sent with ClusterStats of the given list of clusters.
*/
protected abstract void verifyNextReportClusters(List<String[]> clusters);
protected void verifyNextReportClusters(List<String[]> clusters) {
throw new UnsupportedOperationException();
}
protected abstract void sendResponse(List<String> clusters, long loadReportIntervalNano);
protected void sendResponse(List<String> clusters, long loadReportIntervalNano) {
throw new UnsupportedOperationException();
}
}
protected abstract static class MessageFactory {
@ -3731,4 +3757,83 @@ public abstract class XdsClientImplTestBase {
protected abstract Message buildTerminalFilter();
}
@Test
public void dropXdsV2Lds() {
startResourceWatcher(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher);
assertThat(resourceDiscoveryCallsV2).isEmpty();
assertThat(loadReportCallsV2).isEmpty();
}
@Test
public void dropXdsV2Cds() {
startResourceWatcher(XdsClusterResource.getInstance(), CDS_RESOURCE, cdsResourceWatcher);
assertThat(resourceDiscoveryCallsV2).isEmpty();
assertThat(loadReportCallsV2).isEmpty();
}
@Test
public void dropXdsV2Rds() {
startResourceWatcher(XdsRouteConfigureResource.getInstance(), RDS_RESOURCE, rdsResourceWatcher);
assertThat(resourceDiscoveryCallsV2).isEmpty();
assertThat(loadReportCallsV2).isEmpty();
}
@Test
public void dropXdsV2Eds() {
startResourceWatcher(XdsEndpointResource.getInstance(), EDS_RESOURCE, edsResourceWatcher);
assertThat(resourceDiscoveryCallsV2).isEmpty();
assertThat(loadReportCallsV2).isEmpty();
}
protected BindableService createAdsServiceV2() {
return new AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase() {
@Override
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
final StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryResponse> responseObserver) {
assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended
adsEnded.set(false);
@SuppressWarnings("unchecked")
StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryRequest> requestObserver =
mock(StreamObserver.class);
DiscoveryRpcCall call = new DiscoveryRpcCall() {};
resourceDiscoveryCallsV2.offer(call);
Context.current().addListener(
new Context.CancellationListener() {
@Override
public void cancelled(Context context) {
adsEnded.set(true);
}
}, MoreExecutors.directExecutor());
return requestObserver;
}
};
}
protected BindableService createLrsServiceV2() {
return new LoadReportingServiceGrpc.LoadReportingServiceImplBase() {
@Override
public
StreamObserver<io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest>
streamLoadStats(
StreamObserver<io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse>
responseObserver) {
assertThat(lrsEnded.get()).isTrue();
lrsEnded.set(false);
@SuppressWarnings("unchecked")
StreamObserver<io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest> requestObserver =
mock(StreamObserver.class);
LrsRpcCall call = new LrsRpcCall() {};
Context.current().addListener(
new Context.CancellationListener() {
@Override
public void cancelled(Context context) {
lrsEnded.set(true);
}
}, MoreExecutors.directExecutor());
loadReportCallsV2.offer(call);
return requestObserver;
}
};
}
}

View File

@ -1,774 +0,0 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import com.google.protobuf.Message;
import com.google.protobuf.UInt32Value;
import com.google.protobuf.UInt64Value;
import com.google.protobuf.util.Durations;
import com.google.rpc.Code;
import io.envoyproxy.envoy.api.v2.Cluster;
import io.envoyproxy.envoy.api.v2.Cluster.CustomClusterType;
import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType;
import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig;
import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy;
import io.envoyproxy.envoy.api.v2.Cluster.LeastRequestLbConfig;
import io.envoyproxy.envoy.api.v2.Cluster.RingHashLbConfig;
import io.envoyproxy.envoy.api.v2.Cluster.RingHashLbConfig.HashFunction;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
import io.envoyproxy.envoy.api.v2.Listener;
import io.envoyproxy.envoy.api.v2.Resource;
import io.envoyproxy.envoy.api.v2.RouteConfiguration;
import io.envoyproxy.envoy.api.v2.auth.CommonTlsContext;
import io.envoyproxy.envoy.api.v2.auth.SdsSecretConfig;
import io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext;
import io.envoyproxy.envoy.api.v2.cluster.CircuitBreakers;
import io.envoyproxy.envoy.api.v2.cluster.CircuitBreakers.Thresholds;
import io.envoyproxy.envoy.api.v2.cluster.OutlierDetection;
import io.envoyproxy.envoy.api.v2.core.Address;
import io.envoyproxy.envoy.api.v2.core.AggregatedConfigSource;
import io.envoyproxy.envoy.api.v2.core.ApiConfigSource;
import io.envoyproxy.envoy.api.v2.core.ConfigSource;
import io.envoyproxy.envoy.api.v2.core.GrpcService;
import io.envoyproxy.envoy.api.v2.core.GrpcService.GoogleGrpc;
import io.envoyproxy.envoy.api.v2.core.HealthStatus;
import io.envoyproxy.envoy.api.v2.core.Locality;
import io.envoyproxy.envoy.api.v2.core.Node;
import io.envoyproxy.envoy.api.v2.core.RoutingPriority;
import io.envoyproxy.envoy.api.v2.core.SelfConfigSource;
import io.envoyproxy.envoy.api.v2.core.SocketAddress;
import io.envoyproxy.envoy.api.v2.core.TransportSocket;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
import io.envoyproxy.envoy.api.v2.endpoint.Endpoint;
import io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint;
import io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints;
import io.envoyproxy.envoy.api.v2.listener.FilterChain;
import io.envoyproxy.envoy.api.v2.route.Route;
import io.envoyproxy.envoy.api.v2.route.RouteAction;
import io.envoyproxy.envoy.api.v2.route.RouteMatch;
import io.envoyproxy.envoy.api.v2.route.VirtualHost;
import io.envoyproxy.envoy.config.cluster.aggregate.v2alpha.ClusterConfig;
import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager;
import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpFilter;
import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds;
import io.envoyproxy.envoy.config.listener.v2.ApiListener;
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc.LoadReportingServiceImplBase;
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse;
import io.envoyproxy.envoy.type.FractionalPercent;
import io.envoyproxy.envoy.type.FractionalPercent.DenominatorType;
import io.envoyproxy.envoy.type.matcher.RegexMatcher;
import io.grpc.BindableService;
import io.grpc.Context;
import io.grpc.Context.CancellationListener;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
import org.mockito.Mockito;
/**
* Tests for {@link XdsClientImpl} with protocol version v2.
*/
@RunWith(Parameterized.class)
public class XdsClientImplV2Test extends XdsClientImplTestBase {
/** Parameterized test cases. */
@Parameters(name = "ignoreResourceDeletion={0}")
public static Iterable<? extends Boolean> data() {
return ImmutableList.of(false, true);
}
@Parameter
public boolean ignoreResourceDeletion;
@Override
protected BindableService createAdsService() {
return new AggregatedDiscoveryServiceImplBase() {
@Override
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
final StreamObserver<DiscoveryResponse> responseObserver) {
assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended
adsEnded.set(false);
@SuppressWarnings("unchecked")
StreamObserver<DiscoveryRequest> requestObserver = mock(StreamObserver.class);
DiscoveryRpcCall call = new DiscoveryRpcCallV2(requestObserver, responseObserver);
resourceDiscoveryCalls.offer(call);
Context.current().addListener(
new CancellationListener() {
@Override
public void cancelled(Context context) {
adsEnded.set(true);
}
}, MoreExecutors.directExecutor());
return requestObserver;
}
};
}
@Override
protected BindableService createLrsService() {
return new LoadReportingServiceImplBase() {
@Override
public StreamObserver<LoadStatsRequest> streamLoadStats(
StreamObserver<LoadStatsResponse> responseObserver) {
assertThat(lrsEnded.get()).isTrue();
lrsEnded.set(false);
@SuppressWarnings("unchecked")
StreamObserver<LoadStatsRequest> requestObserver = mock(StreamObserver.class);
LrsRpcCall call = new LrsRpcCallV2(requestObserver, responseObserver);
Context.current().addListener(
new CancellationListener() {
@Override
public void cancelled(Context context) {
lrsEnded.set(true);
}
}, MoreExecutors.directExecutor());
loadReportCalls.offer(call);
return requestObserver;
}
};
}
@Override
protected MessageFactory createMessageFactory() {
return new MessageFactoryV2();
}
@Override
protected boolean useProtocolV3() {
return false;
}
@Override
protected boolean ignoreResourceDeletion() {
return ignoreResourceDeletion;
}
private static class DiscoveryRpcCallV2 extends DiscoveryRpcCall {
StreamObserver<DiscoveryRequest> requestObserver;
StreamObserver<DiscoveryResponse> responseObserver;
private DiscoveryRpcCallV2(StreamObserver<DiscoveryRequest> requestObserver,
StreamObserver<DiscoveryResponse> responseObserver) {
this.requestObserver = requestObserver;
this.responseObserver = responseObserver;
}
@Override
protected void verifyRequest(
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
EnvoyProtoData.Node node) {
verify(requestObserver, Mockito.timeout(2000)).onNext(argThat(new DiscoveryRequestMatcher(
node.toEnvoyProtoNodeV2(), versionInfo, resources, type.typeUrlV2(), nonce, null, null)));
}
@Override
protected void verifyRequestNack(
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
EnvoyProtoData.Node node, List<String> errorMessages) {
verify(requestObserver, Mockito.timeout(2000)).onNext(argThat(new DiscoveryRequestMatcher(
node.toEnvoyProtoNodeV2(), versionInfo, resources, type.typeUrlV2(), nonce,
Code.INVALID_ARGUMENT_VALUE, errorMessages)));
}
@Override
protected void verifyNoMoreRequest() {
verifyNoMoreInteractions(requestObserver);
}
@Override
protected void sendResponse(
XdsResourceType<?> type, List<Any> resources, String versionInfo, String nonce) {
DiscoveryResponse response =
DiscoveryResponse.newBuilder()
.setVersionInfo(versionInfo)
.addAllResources(resources)
.setTypeUrl(type.typeUrl())
.setNonce(nonce)
.build();
responseObserver.onNext(response);
}
@Override
protected void sendError(Throwable t) {
responseObserver.onError(t);
}
@Override
protected void sendCompleted() {
responseObserver.onCompleted();
}
}
private static class LrsRpcCallV2 extends LrsRpcCall {
private final StreamObserver<LoadStatsRequest> requestObserver;
private final StreamObserver<LoadStatsResponse> responseObserver;
private final InOrder inOrder;
private LrsRpcCallV2(StreamObserver<LoadStatsRequest> requestObserver,
StreamObserver<LoadStatsResponse> responseObserver) {
this.requestObserver = requestObserver;
this.responseObserver = responseObserver;
inOrder = inOrder(requestObserver);
}
@Override
protected void verifyNextReportClusters(List<String[]> clusters) {
inOrder.verify(requestObserver).onNext(argThat(new LrsRequestMatcher(clusters)));
}
@Override
protected void sendResponse(List<String> clusters, long loadReportIntervalNano) {
LoadStatsResponse response =
LoadStatsResponse.newBuilder()
.addAllClusters(clusters)
.setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNano))
.build();
responseObserver.onNext(response);
}
}
private static class MessageFactoryV2 extends MessageFactory {
@Override
protected Any buildWrappedResource(Any originalResource) {
return Any.pack(Resource.newBuilder()
.setResource(originalResource)
.build());
}
@SuppressWarnings("unchecked")
@Override
protected Message buildListenerWithApiListener(
String name, Message routeConfiguration, List<? extends Message> httpFilters) {
return Listener.newBuilder()
.setName(name)
.setAddress(Address.getDefaultInstance())
.addFilterChains(FilterChain.getDefaultInstance())
.setApiListener(
ApiListener.newBuilder().setApiListener(Any.pack(
HttpConnectionManager.newBuilder()
.setRouteConfig((RouteConfiguration) routeConfiguration)
.addAllHttpFilters((List<HttpFilter>) httpFilters)
.build())))
.build();
}
@Override
protected Message buildListenerWithApiListenerForRds(String name, String rdsResourceName) {
return Listener.newBuilder()
.setName(name)
.setAddress(Address.getDefaultInstance())
.addFilterChains(FilterChain.getDefaultInstance())
.setApiListener(
ApiListener.newBuilder().setApiListener(Any.pack(
HttpConnectionManager.newBuilder()
.setRds(
Rds.newBuilder()
.setRouteConfigName(rdsResourceName)
.setConfigSource(
ConfigSource.newBuilder()
.setAds(AggregatedConfigSource.getDefaultInstance())))
.build())))
.build();
}
@Override
protected Message buildListenerWithApiListenerInvalid(String name) {
return Listener.newBuilder()
.setName(name)
.setAddress(Address.getDefaultInstance())
.setApiListener(ApiListener.newBuilder().setApiListener(FAILING_ANY))
.build();
}
@Override
protected Message buildHttpFilter(String name, @Nullable Any typedConfig, boolean isOptional) {
throw new UnsupportedOperationException();
}
@Override
protected Any buildHttpFaultTypedConfig(
@Nullable Long delayNanos, @Nullable Integer delayRate, String upstreamCluster,
List<String> downstreamNodes, @Nullable Integer maxActiveFaults, @Nullable Status status,
@Nullable Integer httpCode, @Nullable Integer abortRate) {
throw new UnsupportedOperationException();
}
@Override
protected Message buildRouteConfiguration(String name, List<Message> virtualHostList) {
RouteConfiguration.Builder builder = RouteConfiguration.newBuilder();
builder.setName(name);
for (Message virtualHost : virtualHostList) {
builder.addVirtualHosts((VirtualHost) virtualHost);
}
return builder.build();
}
@Override
protected Message buildRouteConfigurationInvalid(String name) {
// Invalid Path matcher: Pattern.compile() will throw PatternSyntaxException
// when attempting to process SAFE_REGEX RouteMatch malformed safe regex pattern.
// I wish there was a simpler way.
return RouteConfiguration.newBuilder()
.setName(name)
.addVirtualHosts(
VirtualHost.newBuilder()
.setName("do not care")
.addDomains("do not care")
.addRoutes(
Route.newBuilder()
.setRoute(RouteAction.newBuilder().setCluster("do not care"))
.setMatch(RouteMatch.newBuilder()
.setSafeRegex(RegexMatcher.newBuilder().setRegex("[z-a]")))))
.build();
}
@Override
protected List<Message> buildOpaqueVirtualHosts(int num) {
List<Message> virtualHosts = new ArrayList<>(num);
for (int i = 0; i < num; i++) {
VirtualHost virtualHost =
VirtualHost.newBuilder()
.setName(num + ": do not care")
.addDomains("do not care")
.addRoutes(
Route.newBuilder()
.setRoute(RouteAction.newBuilder().setCluster("do not care"))
.setMatch(RouteMatch.newBuilder()
.setPrefix("do not care")))
.build();
virtualHosts.add(virtualHost);
}
return virtualHosts;
}
@SuppressWarnings("unchecked")
@Override
protected Message buildVirtualHost(
List<? extends Message> routes, Map<String, Any> typedConfigMap) {
return VirtualHost.newBuilder()
.setName("do not care")
.addDomains("do not care")
.addAllRoutes((List<Route>) routes)
.putAllTypedPerFilterConfig(typedConfigMap)
.build();
}
@Override
protected List<? extends Message> buildOpaqueRoutes(int num) {
List<Route> routes = new ArrayList<>(num);
for (int i = 0; i < num; i++) {
Route route =
Route.newBuilder()
.setRoute(RouteAction.newBuilder().setCluster("do not care"))
.setMatch(RouteMatch.newBuilder().setPrefix("do not care"))
.build();
routes.add(route);
}
return routes;
}
@Override
protected Message buildClusterInvalid(String name) {
// Unspecified cluster discovery type
return Cluster.newBuilder().setName(name).build();
}
@Override
protected Message buildEdsCluster(String clusterName, @Nullable String edsServiceName,
String lbPolicy, @Nullable Message ringHashLbConfig, @Nullable Message leastRequestLbConfig,
boolean enableLrs,
@Nullable Message upstreamTlsContext, String transportSocketName,
@Nullable Message circuitBreakers, @Nullable Message outlierDetection) {
Cluster.Builder builder = initClusterBuilder(
clusterName, lbPolicy, ringHashLbConfig, leastRequestLbConfig,
enableLrs, upstreamTlsContext, circuitBreakers, outlierDetection);
builder.setType(DiscoveryType.EDS);
EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder();
edsClusterConfigBuilder.setEdsConfig(
ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())); // ADS
if (edsServiceName != null) {
edsClusterConfigBuilder.setServiceName(edsServiceName);
}
builder.setEdsClusterConfig(edsClusterConfigBuilder);
return builder.build();
}
@Override
protected Message buildLogicalDnsCluster(String clusterName, String dnsHostAddr,
int dnsHostPort, String lbPolicy, @Nullable Message ringHashLbConfig,
@Nullable Message leastRequestLbConfig, boolean enableLrs,
@Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) {
Cluster.Builder builder = initClusterBuilder(
clusterName, lbPolicy, ringHashLbConfig, leastRequestLbConfig,
enableLrs, upstreamTlsContext, circuitBreakers, null);
builder.setType(DiscoveryType.LOGICAL_DNS);
builder.setLoadAssignment(
ClusterLoadAssignment.newBuilder().addEndpoints(
LocalityLbEndpoints.newBuilder().addLbEndpoints(
LbEndpoint.newBuilder().setEndpoint(
Endpoint.newBuilder().setAddress(
Address.newBuilder().setSocketAddress(
SocketAddress.newBuilder()
.setAddress(dnsHostAddr).setPortValue(dnsHostPort)))))).build());
return builder.build();
}
@Override
protected Message buildAggregateCluster(String clusterName, String lbPolicy,
@Nullable Message ringHashLbConfig, @Nullable Message leastRequestLbConfig,
List<String> clusters) {
ClusterConfig clusterConfig = ClusterConfig.newBuilder().addAllClusters(clusters).build();
CustomClusterType type =
CustomClusterType.newBuilder()
.setName(XdsResourceType.AGGREGATE_CLUSTER_TYPE_NAME)
.setTypedConfig(Any.pack(clusterConfig))
.build();
Cluster.Builder builder = Cluster.newBuilder().setName(clusterName).setClusterType(type);
if (lbPolicy.equals("round_robin")) {
builder.setLbPolicy(LbPolicy.ROUND_ROBIN);
} else if (lbPolicy.equals("ring_hash_experimental")) {
builder.setLbPolicy(LbPolicy.RING_HASH);
builder.setRingHashLbConfig((RingHashLbConfig) ringHashLbConfig);
} else if (lbPolicy.equals("least_request_experimental")) {
builder.setLbPolicy(LbPolicy.LEAST_REQUEST);
builder.setLeastRequestLbConfig((LeastRequestLbConfig) leastRequestLbConfig);
} else {
throw new AssertionError("Invalid LB policy");
}
return builder.build();
}
private Cluster.Builder initClusterBuilder(String clusterName, String lbPolicy,
@Nullable Message ringHashLbConfig, @Nullable Message leastRequestLbConfig,
boolean enableLrs, @Nullable Message upstreamTlsContext,
@Nullable Message circuitBreakers, @Nullable Message outlierDetection) {
Cluster.Builder builder = Cluster.newBuilder();
builder.setName(clusterName);
if (lbPolicy.equals("round_robin")) {
builder.setLbPolicy(LbPolicy.ROUND_ROBIN);
} else if (lbPolicy.equals("ring_hash_experimental")) {
builder.setLbPolicy(LbPolicy.RING_HASH);
builder.setRingHashLbConfig((RingHashLbConfig) ringHashLbConfig);
} else if (lbPolicy.equals("least_request_experimental")) {
builder.setLbPolicy(LbPolicy.LEAST_REQUEST);
builder.setLeastRequestLbConfig((LeastRequestLbConfig) leastRequestLbConfig);
} else {
throw new AssertionError("Invalid LB policy");
}
if (enableLrs) {
builder.setLrsServer(
ConfigSource.newBuilder()
.setSelf(SelfConfigSource.getDefaultInstance()));
}
if (upstreamTlsContext != null) {
builder.setTransportSocket(
TransportSocket.newBuilder()
.setName("envoy.transport_sockets.tls")
.setTypedConfig(Any.pack(upstreamTlsContext)));
}
if (circuitBreakers != null) {
builder.setCircuitBreakers((CircuitBreakers) circuitBreakers);
}
if (outlierDetection != null) {
builder.setOutlierDetection((OutlierDetection) outlierDetection);
}
return builder;
}
@Override
protected Message buildRingHashLbConfig(String hashFunction, long minRingSize,
long maxRingSize) {
RingHashLbConfig.Builder builder = RingHashLbConfig.newBuilder();
if (hashFunction.equals("xx_hash")) {
builder.setHashFunction(HashFunction.XX_HASH);
} else if (hashFunction.equals("murmur_hash_2")) {
builder.setHashFunction(HashFunction.MURMUR_HASH_2);
} else {
throw new AssertionError("Invalid hash function");
}
builder.setMinimumRingSize(UInt64Value.newBuilder().setValue(minRingSize).build());
builder.setMaximumRingSize(UInt64Value.newBuilder().setValue(maxRingSize).build());
return builder.build();
}
@Override
protected Message buildLeastRequestLbConfig(int choiceCount) {
LeastRequestLbConfig.Builder builder = LeastRequestLbConfig.newBuilder();
builder.setChoiceCount(UInt32Value.newBuilder().setValue(choiceCount));
return builder.build();
}
@Override
protected Message buildUpstreamTlsContext(String instanceName, String certName) {
GrpcService grpcService =
GrpcService.newBuilder()
.setGoogleGrpc(GoogleGrpc.newBuilder().setTargetUri(certName))
.build();
ConfigSource sdsConfig =
ConfigSource.newBuilder()
.setApiConfigSource(ApiConfigSource.newBuilder().addGrpcServices(grpcService))
.build();
SdsSecretConfig validationContextSdsSecretConfig =
SdsSecretConfig.newBuilder()
.setName(instanceName)
.setSdsConfig(sdsConfig)
.build();
return UpstreamTlsContext.newBuilder()
.setCommonTlsContext(
CommonTlsContext.newBuilder()
.setValidationContextSdsSecretConfig(validationContextSdsSecretConfig))
.build();
}
@Override
protected Message buildNewUpstreamTlsContext(String instanceName, String certName) {
return buildUpstreamTlsContext(instanceName, certName);
}
@Override
protected Message buildCircuitBreakers(int highPriorityMaxRequests,
int defaultPriorityMaxRequests) {
return CircuitBreakers.newBuilder()
.addThresholds(
Thresholds.newBuilder()
.setPriority(RoutingPriority.HIGH)
.setMaxRequests(UInt32Value.newBuilder().setValue(highPriorityMaxRequests)))
.addThresholds(
Thresholds.newBuilder()
.setPriority(RoutingPriority.DEFAULT)
.setMaxRequests(UInt32Value.newBuilder().setValue(defaultPriorityMaxRequests)))
.build();
}
@Override
protected Message buildClusterLoadAssignment(String cluster,
List<Message> localityLbEndpointsList, List<Message> dropOverloadList) {
ClusterLoadAssignment.Builder builder = ClusterLoadAssignment.newBuilder();
builder.setClusterName(cluster);
for (Message localityLbEndpoints : localityLbEndpointsList) {
builder.addEndpoints((LocalityLbEndpoints) localityLbEndpoints);
}
Policy.Builder policyBuilder = Policy.newBuilder();
for (Message dropOverload : dropOverloadList) {
policyBuilder.addDropOverloads((DropOverload) dropOverload);
}
builder.setPolicy(policyBuilder);
return builder.build();
}
@Override
protected Message buildClusterLoadAssignmentInvalid(String cluster) {
// Negative priority LocalityLbEndpoint.
return ClusterLoadAssignment.newBuilder()
.setClusterName(cluster)
.addEndpoints(LocalityLbEndpoints.newBuilder()
.setPriority(-1)
.setLoadBalancingWeight(UInt32Value.newBuilder().setValue(1)))
.build();
}
@Override
protected Message buildLocalityLbEndpoints(String region, String zone, String subZone,
List<Message> lbEndpointList, int loadBalancingWeight, int priority) {
LocalityLbEndpoints.Builder builder = LocalityLbEndpoints.newBuilder();
builder.setLocality(
Locality.newBuilder().setRegion(region).setZone(zone).setSubZone(subZone));
for (Message lbEndpoint : lbEndpointList) {
builder.addLbEndpoints((LbEndpoint) lbEndpoint);
}
builder.setLoadBalancingWeight(UInt32Value.of(loadBalancingWeight));
builder.setPriority(priority);
return builder.build();
}
@Override
protected Message buildLbEndpoint(String address, int port, String healthStatus,
int lbWeight) {
HealthStatus status;
switch (healthStatus) {
case "unknown":
status = HealthStatus.UNKNOWN;
break;
case "healthy":
status = HealthStatus.HEALTHY;
break;
case "unhealthy":
status = HealthStatus.UNHEALTHY;
break;
case "draining":
status = HealthStatus.DRAINING;
break;
case "timeout":
status = HealthStatus.TIMEOUT;
break;
case "degraded":
status = HealthStatus.DEGRADED;
break;
default:
status = HealthStatus.UNRECOGNIZED;
}
return LbEndpoint.newBuilder()
.setEndpoint(
Endpoint.newBuilder().setAddress(
Address.newBuilder().setSocketAddress(
SocketAddress.newBuilder().setAddress(address).setPortValue(port))))
.setHealthStatus(status)
.setLoadBalancingWeight(UInt32Value.of(lbWeight))
.build();
}
@Override
protected Message buildDropOverload(String category, int dropPerMillion) {
return DropOverload.newBuilder()
.setCategory(category)
.setDropPercentage(
FractionalPercent.newBuilder()
.setNumerator(dropPerMillion)
.setDenominator(DenominatorType.MILLION))
.build();
}
@Override
protected Message buildFilterChain(List<String> alpn, Message tlsContext,
String transportSocketName, Message... filters) {
throw new UnsupportedOperationException();
}
@Override
protected Message buildListenerWithFilterChain(
String name, int portValue, String address, Message... filterChains) {
throw new UnsupportedOperationException();
}
@Override
protected Message buildHttpConnectionManagerFilter(
@Nullable String rdsName, @Nullable Message routeConfig, List<Message> httpFilters) {
throw new UnsupportedOperationException();
}
@Override
protected Message buildTerminalFilter() {
throw new UnsupportedOperationException();
}
}
/**
* Matches a {@link DiscoveryRequest} with the same node metadata, versionInfo, typeUrl,
* response nonce and collection of resource names regardless of order.
*/
private static class DiscoveryRequestMatcher implements ArgumentMatcher<DiscoveryRequest> {
private final Node node;
private final String versionInfo;
private final String typeUrl;
private final Set<String> resources;
private final String responseNonce;
@Nullable private final Integer errorCode;
private final List<String> errorMessages;
private DiscoveryRequestMatcher(Node node, String versionInfo, List<String> resources,
String typeUrl, String responseNonce, @Nullable Integer errorCode,
@Nullable List<String> errorMessages) {
this.node = node;
this.versionInfo = versionInfo;
this.resources = new HashSet<>(resources);
this.typeUrl = typeUrl;
this.responseNonce = responseNonce;
this.errorCode = errorCode;
this.errorMessages = errorMessages != null ? errorMessages : ImmutableList.<String>of();
}
@Override
public boolean matches(DiscoveryRequest argument) {
if (!typeUrl.equals(argument.getTypeUrl())) {
return false;
}
if (!versionInfo.equals(argument.getVersionInfo())) {
return false;
}
if (!responseNonce.equals(argument.getResponseNonce())) {
return false;
}
if (!resources.equals(new HashSet<>(argument.getResourceNamesList()))) {
return false;
}
if (errorCode == null && argument.hasErrorDetail()) {
return false;
}
if (errorCode != null
&& !matchErrorDetail(argument.getErrorDetail(), errorCode, errorMessages)) {
return false;
}
return node.equals(argument.getNode());
}
}
/**
* Matches a {@link LoadStatsRequest} containing a collection of {@link ClusterStats} with
* the same list of clusterName:clusterServiceName pair.
*/
private static class LrsRequestMatcher implements ArgumentMatcher<LoadStatsRequest> {
private final List<String> expected;
private LrsRequestMatcher(List<String[]> clusterNames) {
expected = new ArrayList<>();
for (String[] pair : clusterNames) {
expected.add(pair[0] + ":" + (pair[1] == null ? "" : pair[1]));
}
Collections.sort(expected);
}
@Override
public boolean matches(LoadStatsRequest argument) {
List<String> actual = new ArrayList<>();
for (ClusterStats clusterStats : argument.getClusterStatsList()) {
actual.add(clusterStats.getClusterName() + ":" + clusterStats.getClusterServiceName());
}
Collections.sort(actual);
return actual.equals(expected);
}
}
}

View File

@ -145,7 +145,7 @@ public class XdsNameResolverTest {
private final TestChannel channel = new TestChannel();
private BootstrapInfo bootstrapInfo = BootstrapInfo.builder()
.servers(ImmutableList.of(ServerInfo.create(
"td.googleapis.com", InsecureChannelCredentials.create(), true)))
"td.googleapis.com", InsecureChannelCredentials.create())))
.node(Node.newBuilder().build())
.build();
private String expectedLdsResourceName = AUTHORITY;
@ -231,7 +231,7 @@ public class XdsNameResolverTest {
public void resolving_noTargetAuthority_templateWithoutXdstp() {
bootstrapInfo = BootstrapInfo.builder()
.servers(ImmutableList.of(ServerInfo.create(
"td.googleapis.com", InsecureChannelCredentials.create(), true)))
"td.googleapis.com", InsecureChannelCredentials.create())))
.node(Node.newBuilder().build())
.clientDefaultListenerResourceNameTemplate("%s/id=1")
.build();
@ -249,7 +249,7 @@ public class XdsNameResolverTest {
public void resolving_noTargetAuthority_templateWithXdstp() {
bootstrapInfo = BootstrapInfo.builder()
.servers(ImmutableList.of(ServerInfo.create(
"td.googleapis.com", InsecureChannelCredentials.create(), true)))
"td.googleapis.com", InsecureChannelCredentials.create())))
.node(Node.newBuilder().build())
.clientDefaultListenerResourceNameTemplate(
"xdstp://xds.authority.com/envoy.config.listener.v3.Listener/%s?id=1")

View File

@ -55,7 +55,7 @@ public class XdsServerTestHelper {
Bootstrapper.BootstrapInfo.builder()
.servers(Arrays.asList(
Bootstrapper.ServerInfo.create(
SERVER_URI, InsecureChannelCredentials.create(), true)))
SERVER_URI, InsecureChannelCredentials.create())))
.node(BOOTSTRAP_NODE)
.serverListenerResourceNameTemplate("grpc/server?udpa.resource.listening_address=%s")
.build();

View File

@ -125,15 +125,34 @@ public class XdsServerWrapperTest {
}
@Test
public void testBootstrap_notV3() throws Exception {
@SuppressWarnings("unchecked")
public void testBootstrap() throws Exception {
Bootstrapper.BootstrapInfo b =
Bootstrapper.BootstrapInfo.builder()
.servers(Arrays.asList(
Bootstrapper.ServerInfo.create("uri", InsecureChannelCredentials.create(), false)))
Bootstrapper.ServerInfo.create("uri", InsecureChannelCredentials.create())))
.node(EnvoyProtoData.Node.newBuilder().setId("id").build())
.serverListenerResourceNameTemplate("grpc/server?udpa.resource.listening_address=%s")
.build();
verifyBootstrapFail(b);
XdsClient xdsClient = mock(XdsClient.class);
XdsListenerResource listenerResource = XdsListenerResource.getInstance();
when(xdsClient.getBootstrapInfo()).thenReturn(b);
xdsServerWrapper = new XdsServerWrapper("[::FFFF:129.144.52.38]:80", mockBuilder, listener,
selectorManager, new FakeXdsClientPoolFactory(xdsClient), filterRegistry);
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
xdsServerWrapper.start();
} catch (IOException ex) {
// ignore
}
}
});
verify(xdsClient, timeout(5000)).watchXdsResource(
eq(listenerResource),
eq("grpc/server?udpa.resource.listening_address=[::FFFF:129.144.52.38]:80"),
any(ResourceWatcher.class));
}
@Test
@ -141,7 +160,7 @@ public class XdsServerWrapperTest {
Bootstrapper.BootstrapInfo b =
Bootstrapper.BootstrapInfo.builder()
.servers(Arrays.asList(
Bootstrapper.ServerInfo.create("uri", InsecureChannelCredentials.create(), true)))
Bootstrapper.ServerInfo.create("uri", InsecureChannelCredentials.create())))
.node(EnvoyProtoData.Node.newBuilder().setId("id").build())
.build();
verifyBootstrapFail(b);
@ -181,7 +200,7 @@ public class XdsServerWrapperTest {
Bootstrapper.BootstrapInfo b = Bootstrapper.BootstrapInfo.builder()
.servers(Arrays.asList(
Bootstrapper.ServerInfo.create(
"uri", InsecureChannelCredentials.create(), true)))
"uri", InsecureChannelCredentials.create())))
.node(EnvoyProtoData.Node.newBuilder().setId("id").build())
.serverListenerResourceNameTemplate(
"xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/server/%s")