xds: implement XdsClient thread-safety and synchronization for gRPC client (refactored XdsClient to client and server usages separately) (#7533)

Two major changes involved:

- Separated client and server side XdsClient code paths. Currently the single XdsClientImpl2 implementation runs separate code paths for client side and server side usages. Due to different implementation progress for client side and server side development, client and server implementations diverge in whether it supports multiple/removing watchers, response data cache, synchronization model, etc. It became cumbersome to put them together in a single class. The separation is effectively duplicating the XdsClientImpl2 class for client and server so that the two sides can develop independently. But we made this AbstractXdsClient to reuse some of the code, such as the logic for xDS RPC stream. More details can be found in go/separate-client-server-xds-client.

- Changes the synchronization model for the client side APIs. Multiple gRPC Channels will be sharing a single XdsClient instance. So the client side APIs need to be thread-safe. Also, the XdsClient needs to implement synchronization for API calls and xDS RPC callbacks without using a particular Channel's SynchronizationContext. This is done by using XdsClient's own lock.
This commit is contained in:
Chengyuan Zhang 2020-10-23 13:38:24 -07:00 committed by GitHub
parent b6601ba273
commit 40191b2f81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 4521 additions and 0 deletions

View File

@ -0,0 +1,751 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.TypeRegistry;
import com.google.protobuf.util.JsonFormat;
import com.google.rpc.Code;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.InternalLogId;
import io.grpc.Status;
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/**
* Common base type for XdsClient implementations, which encapsulates the layer abstraction of
* the xDS RPC stream.
*/
abstract class AbstractXdsClient extends XdsClient {
private static final String ADS_TYPE_URL_LDS_V2 = "type.googleapis.com/envoy.api.v2.Listener";
private static final String ADS_TYPE_URL_LDS =
"type.googleapis.com/envoy.config.listener.v3.Listener";
private static final String ADS_TYPE_URL_RDS_V2 =
"type.googleapis.com/envoy.api.v2.RouteConfiguration";
private static final String ADS_TYPE_URL_RDS =
"type.googleapis.com/envoy.config.route.v3.RouteConfiguration";
private static final String ADS_TYPE_URL_CDS_V2 = "type.googleapis.com/envoy.api.v2.Cluster";
private static final String ADS_TYPE_URL_CDS =
"type.googleapis.com/envoy.config.cluster.v3.Cluster";
private static final String ADS_TYPE_URL_EDS_V2 =
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
private static final String ADS_TYPE_URL_EDS =
"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
private final MessagePrinter respPrinter = new MessagePrinter();
private final InternalLogId logId;
private final XdsLogger logger;
private final XdsChannel xdsChannel;
private final ScheduledExecutorService timeService;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Stopwatch stopwatch;
// The node identifier to be included in xDS requests. Management server only requires the
// first request to carry the node identifier on a stream. It should be identical if present
// more than once.
// FIXME(chengyuanzhang): should immutable and invisible to child classes. Currently server side
// has some protocol workaround usages.
protected Node node;
// Last successfully applied version_info for each resource type. Starts with empty string.
// A version_info is used to update management server with client's most recent knowledge of
// resources.
private String ldsVersion = "";
private String rdsVersion = "";
private String cdsVersion = "";
private String edsVersion = "";
private boolean shutdown;
@Nullable
private AbstractAdsStream adsStream;
@Nullable
private BackoffPolicy retryBackoffPolicy;
@Nullable
private ScheduledFuture<?> rpcRetryTimer;
AbstractXdsClient(XdsChannel channel, Node node, ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier) {
this.xdsChannel = checkNotNull(channel, "channel");
this.node = checkNotNull(node, "node");
this.timeService = checkNotNull(timeService, "timeService");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
logId = InternalLogId.allocate("xds-client", null);
logger = XdsLogger.withLogId(logId);
logger.log(XdsLogLevel.INFO, "Created");
}
/**
* Called when an LDS response is received.
*/
// Must be synchronized.
protected void handleLdsResponse(String versionInfo, List<Any> resources, String nonce) {
}
/**
* Called when a RDS response is received.
*/
// Must be synchronized.
protected void handleRdsResponse(String versionInfo, List<Any> resources, String nonce) {
}
/**
* Called when a CDS response is received.
*/
// Must be synchronized.
protected void handleCdsResponse(String versionInfo, List<Any> resources, String nonce) {
}
/**
* Called when an EDS response is received.
*/
// Must be synchronized.
protected void handleEdsResponse(String versionInfo, List<Any> resources, String nonce) {
}
/**
* Called when the ADS stream is closed passively.
*/
// Must be synchronized.
protected void handleStreamClosed(Status error) {
}
/**
* Called when the ADS stream has been recreated.
*/
// Must be synchronized.
protected void handleStreamRestarted() {
}
/**
* Called when being shut down.
*/
// Must be synchronized.
protected void handleShutdown() {
}
/**
* Synchronizes the execution of the given {@code runnable} with other state mutating operations.
*/
protected abstract void runWithSynchronized(Runnable runnable);
@Override
final void shutdown() {
runWithSynchronized(new Runnable() {
@Override
public void run() {
shutdown = true;
logger.log(XdsLogLevel.INFO, "Shutting down");
xdsChannel.getManagedChannel().shutdown();
if (adsStream != null) {
adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
}
if (rpcRetryTimer != null) {
rpcRetryTimer.cancel(false);
}
handleShutdown();
}
});
}
@Override
public String toString() {
return logId.toString();
}
/**
* Returns the collection of resources currently subscribing to or {@code null} if not
* subscribing to any resources for the given type.
*
* <p>Note an empty collection indicates subscribing to resources of the given type with
* wildcard mode.
*/
// Must be synchronized.
@Nullable
abstract Collection<String> getSubscribedResources(ResourceType type);
/**
* Updates the resource subscription for the given resource type.
*/
// Must be synchronized.
protected final void adjustResourceSubscription(ResourceType type) {
if (isInBackoff()) {
return;
}
if (adsStream == null) {
startRpcStream();
}
Collection<String> resources = getSubscribedResources(type);
if (resources != null) {
adsStream.sendDiscoveryRequest(type, resources);
}
}
/**
* Accepts the update for the given resource type by updating the latest resource version
* and sends an ACK request to the management server.
*/
// Must be synchronized.
protected final void ackResponse(ResourceType type, String versionInfo, String nonce) {
switch (type) {
case LDS:
ldsVersion = versionInfo;
break;
case RDS:
rdsVersion = versionInfo;
break;
case CDS:
cdsVersion = versionInfo;
break;
case EDS:
edsVersion = versionInfo;
break;
case UNKNOWN:
default:
throw new AssertionError("Unknown resource type: " + type);
}
logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
type, nonce, versionInfo);
Collection<String> resources = getSubscribedResources(type);
if (resources == null) {
resources = Collections.emptyList();
}
adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, null);
}
/**
* Rejects the update for the given resource type and sends an NACK request (request with last
* accepted version) to the management server.
*/
// Must be synchronized.
protected final void nackResponse(ResourceType type, String nonce, String errorDetail) {
String versionInfo = getCurrentVersion(type);
logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
type, nonce, versionInfo);
Collection<String> resources = getSubscribedResources(type);
if (resources == null) {
resources = Collections.emptyList();
}
adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail);
}
protected final ScheduledExecutorService getTimeService() {
return timeService;
}
protected final XdsLogger getLogger() {
return logger;
}
/**
* Returns {@code true} if the resource discovery is currently in backoff.
*/
// Must be synchronized.
protected final boolean isInBackoff() {
return rpcRetryTimer != null && !rpcRetryTimer.isDone();
}
/**
* Establishes the RPC connection by creating a new RPC stream on the given channel for
* xDS protocol communication.
*/
// Must be synchronized.
private void startRpcStream() {
checkState(adsStream == null, "Previous adsStream has not been cleared yet");
if (xdsChannel.isUseProtocolV3()) {
adsStream = new AdsStreamV3();
} else {
adsStream = new AdsStreamV2();
}
adsStream.start();
logger.log(XdsLogLevel.INFO, "ADS stream started");
stopwatch.reset().start();
}
/**
* Returns the latest accepted version of the given resource type.
*/
// Must be synchronized.
private String getCurrentVersion(ResourceType type) {
String version;
switch (type) {
case LDS:
version = ldsVersion;
break;
case RDS:
version = rdsVersion;
break;
case CDS:
version = cdsVersion;
break;
case EDS:
version = edsVersion;
break;
case UNKNOWN:
default:
throw new AssertionError("Unknown resource type: " + type);
}
return version;
}
@VisibleForTesting
final class RpcRetryTask implements Runnable {
@Override
public void run() {
runWithSynchronized(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
startRpcStream();
for (ResourceType type : ResourceType.values()) {
if (type == ResourceType.UNKNOWN) {
continue;
}
Collection<String> resources = getSubscribedResources(type);
if (resources != null) {
adsStream.sendDiscoveryRequest(type, resources);
}
}
handleStreamRestarted();
}
});
}
}
protected enum ResourceType {
UNKNOWN, LDS, RDS, CDS, EDS;
String typeUrl() {
switch (this) {
case LDS:
return ADS_TYPE_URL_LDS;
case RDS:
return ADS_TYPE_URL_RDS;
case CDS:
return ADS_TYPE_URL_CDS;
case EDS:
return ADS_TYPE_URL_EDS;
case UNKNOWN:
default:
throw new AssertionError("Unknown or missing case in enum switch: " + this);
}
}
String typeUrlV2() {
switch (this) {
case LDS:
return ADS_TYPE_URL_LDS_V2;
case RDS:
return ADS_TYPE_URL_RDS_V2;
case CDS:
return ADS_TYPE_URL_CDS_V2;
case EDS:
return ADS_TYPE_URL_EDS_V2;
case UNKNOWN:
default:
throw new AssertionError("Unknown or missing case in enum switch: " + this);
}
}
private static ResourceType fromTypeUrl(String typeUrl) {
switch (typeUrl) {
case ADS_TYPE_URL_LDS:
// fall trough
case ADS_TYPE_URL_LDS_V2:
return LDS;
case ADS_TYPE_URL_RDS:
// fall through
case ADS_TYPE_URL_RDS_V2:
return RDS;
case ADS_TYPE_URL_CDS:
// fall through
case ADS_TYPE_URL_CDS_V2:
return CDS;
case ADS_TYPE_URL_EDS:
// fall through
case ADS_TYPE_URL_EDS_V2:
return EDS;
default:
return UNKNOWN;
}
}
}
private abstract class AbstractAdsStream {
private boolean responseReceived;
private boolean closed;
// Response nonce for the most recently received discovery responses of each resource type.
// Client initiated requests start response nonce with empty string.
// A nonce is used to indicate the specific DiscoveryResponse each DiscoveryRequest
// corresponds to.
// A nonce becomes stale following a newer nonce being presented to the client in a
// DiscoveryResponse.
private String ldsRespNonce = "";
private String rdsRespNonce = "";
private String cdsRespNonce = "";
private String edsRespNonce = "";
abstract void start();
abstract void sendError(Exception error);
/**
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
* {@code errorDetail}. Used for reacting to a specific discovery response. For
* client-initiated discovery requests, use {@link
* #sendDiscoveryRequest(ResourceType, Collection)}.
*/
abstract void sendDiscoveryRequest(ResourceType type, String versionInfo,
Collection<String> resources, String nonce, @Nullable String errorDetail);
/**
* Sends a client-initiated discovery request.
*/
final void sendDiscoveryRequest(ResourceType type, Collection<String> resources) {
String nonce;
switch (type) {
case LDS:
nonce = ldsRespNonce;
break;
case RDS:
nonce = rdsRespNonce;
break;
case CDS:
nonce = cdsRespNonce;
break;
case EDS:
nonce = edsRespNonce;
break;
case UNKNOWN:
default:
throw new AssertionError("Unknown resource type: " + type);
}
logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources);
sendDiscoveryRequest(type, getCurrentVersion(type), resources, nonce, null);
}
final void handleRpcResponse(
ResourceType type, String versionInfo, List<Any> resources, String nonce) {
if (closed) {
return;
}
responseReceived = true;
// Nonce in each response is echoed back in the following ACK/NACK request. It is
// used for management server to identify which response the client is ACKing/NACking.
// To avoid confusion, client-initiated requests will always use the nonce in
// most recently received responses of each resource type.
switch (type) {
case LDS:
ldsRespNonce = nonce;
handleLdsResponse(versionInfo, resources, nonce);
break;
case RDS:
rdsRespNonce = nonce;
handleRdsResponse(versionInfo, resources, nonce);
break;
case CDS:
cdsRespNonce = nonce;
handleCdsResponse(versionInfo, resources, nonce);
break;
case EDS:
edsRespNonce = nonce;
handleEdsResponse(versionInfo, resources, nonce);
break;
case UNKNOWN:
default:
logger.log(XdsLogLevel.WARNING, "Ignore an unknown type of DiscoveryResponse");
}
}
final void handleRpcError(Throwable t) {
handleRpcStreamClosed(Status.fromThrowable(t));
}
final void handleRpcCompleted() {
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
}
private void handleRpcStreamClosed(Status error) {
checkArgument(!error.isOk(), "unexpected OK status");
if (closed) {
return;
}
logger.log(
XdsLogLevel.ERROR,
"ADS stream closed with status {0}: {1}. Cause: {2}",
error.getCode(), error.getDescription(), error.getCause());
closed = true;
handleStreamClosed(error);
cleanUp();
if (responseReceived || retryBackoffPolicy == null) {
// Reset the backoff sequence if had received a response, or backoff sequence
// has never been initialized.
retryBackoffPolicy = backoffPolicyProvider.get();
}
long delayNanos = 0;
if (!responseReceived) {
delayNanos =
Math.max(
0,
retryBackoffPolicy.nextBackoffNanos()
- stopwatch.elapsed(TimeUnit.NANOSECONDS));
}
logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);
rpcRetryTimer = timeService.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS);
}
private void close(Exception error) {
if (closed) {
return;
}
closed = true;
cleanUp();
sendError(error);
}
private void cleanUp() {
if (adsStream == this) {
adsStream = null;
}
}
}
private final class AdsStreamV2 extends AbstractAdsStream {
private StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryRequest> requestWriter;
@Override
void start() {
io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc
.AggregatedDiscoveryServiceStub stub =
io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.newStub(
xdsChannel.getManagedChannel());
StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryResponse> responseReaderV2 =
new StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryResponse>() {
@Override
public void onNext(final io.envoyproxy.envoy.api.v2.DiscoveryResponse response) {
runWithSynchronized(new Runnable() {
@Override
public void run() {
ResourceType type = ResourceType.fromTypeUrl(response.getTypeUrl());
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(XdsLogLevel.DEBUG, "Received {0} response:\n{1}",
type, respPrinter.print(response));
}
handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
response.getNonce());
}
});
}
@Override
public void onError(final Throwable t) {
runWithSynchronized(new Runnable() {
@Override
public void run() {
handleRpcError(t);
}
});
}
@Override
public void onCompleted() {
runWithSynchronized(new Runnable() {
@Override
public void run() {
handleRpcCompleted();
}
});
}
};
requestWriter = stub.withWaitForReady().streamAggregatedResources(responseReaderV2);
}
@Override
void sendDiscoveryRequest(ResourceType type, String versionInfo, Collection<String> resources,
String nonce, @Nullable String errorDetail) {
checkState(requestWriter != null, "ADS stream has not been started");
io.envoyproxy.envoy.api.v2.DiscoveryRequest.Builder builder =
io.envoyproxy.envoy.api.v2.DiscoveryRequest.newBuilder()
.setVersionInfo(versionInfo)
.setNode(node.toEnvoyProtoNodeV2())
.addAllResourceNames(resources)
.setTypeUrl(type.typeUrlV2())
.setResponseNonce(nonce);
if (errorDetail != null) {
com.google.rpc.Status error =
com.google.rpc.Status.newBuilder()
.setCode(Code.INVALID_ARGUMENT_VALUE) // FIXME(chengyuanzhang): use correct code
.setMessage(errorDetail)
.build();
builder.setErrorDetail(error);
}
io.envoyproxy.envoy.api.v2.DiscoveryRequest request = builder.build();
requestWriter.onNext(request);
logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", request);
}
@Override
void sendError(Exception error) {
requestWriter.onError(error);
}
}
private final class AdsStreamV3 extends AbstractAdsStream {
private StreamObserver<DiscoveryRequest> requestWriter;
@Override
void start() {
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
AggregatedDiscoveryServiceGrpc.newStub(xdsChannel.getManagedChannel());
StreamObserver<DiscoveryResponse> responseReader = new StreamObserver<DiscoveryResponse>() {
@Override
public void onNext(final DiscoveryResponse response) {
runWithSynchronized(new Runnable() {
@Override
public void run() {
ResourceType type = ResourceType.fromTypeUrl(response.getTypeUrl());
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(XdsLogLevel.DEBUG, "Received {0} response:\n{1}",
type, respPrinter.print(response));
}
handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
response.getNonce());
}
});
}
@Override
public void onError(final Throwable t) {
runWithSynchronized(new Runnable() {
@Override
public void run() {
handleRpcError(t);
}
});
}
@Override
public void onCompleted() {
runWithSynchronized(new Runnable() {
@Override
public void run() {
handleRpcCompleted();
}
});
}
};
requestWriter = stub.withWaitForReady().streamAggregatedResources(responseReader);
}
@Override
void sendDiscoveryRequest(ResourceType type, String versionInfo, Collection<String> resources,
String nonce, @Nullable String errorDetail) {
checkState(requestWriter != null, "ADS stream has not been started");
DiscoveryRequest.Builder builder =
DiscoveryRequest.newBuilder()
.setVersionInfo(versionInfo)
.setNode(node.toEnvoyProtoNode())
.addAllResourceNames(resources)
.setTypeUrl(type.typeUrl())
.setResponseNonce(nonce);
if (errorDetail != null) {
com.google.rpc.Status error =
com.google.rpc.Status.newBuilder()
.setCode(Code.INVALID_ARGUMENT_VALUE) // FIXME(chengyuanzhang): use correct code
.setMessage(errorDetail)
.build();
builder.setErrorDetail(error);
}
DiscoveryRequest request = builder.build();
requestWriter.onNext(request);
logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", respPrinter);
}
@Override
void sendError(Exception error) {
requestWriter.onError(error);
}
}
/**
* Convert protobuf message to human readable String format. Useful for protobuf messages
* containing {@link com.google.protobuf.Any} fields.
*/
@VisibleForTesting
static final class MessagePrinter {
private final JsonFormat.Printer printer;
@VisibleForTesting
MessagePrinter() {
TypeRegistry registry =
TypeRegistry.newBuilder()
.add(Listener.getDescriptor())
.add(io.envoyproxy.envoy.api.v2.Listener.getDescriptor())
.add(HttpConnectionManager.getDescriptor())
.add(
io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2
.HttpConnectionManager.getDescriptor())
.add(RouteConfiguration.getDescriptor())
.add(io.envoyproxy.envoy.api.v2.RouteConfiguration.getDescriptor())
.add(Cluster.getDescriptor())
.add(io.envoyproxy.envoy.api.v2.Cluster.getDescriptor())
.add(ClusterLoadAssignment.getDescriptor())
.add(io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.getDescriptor())
.build();
printer = JsonFormat.printer().usingTypeRegistry(registry);
}
@VisibleForTesting
String print(MessageOrBuilder message) {
String res;
try {
res = printer.print(message);
} catch (InvalidProtocolBufferException e) {
res = message + " (failed to pretty-print: " + e + ")";
}
return res;
}
}
}

View File

@ -0,0 +1,825 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument;
import static io.grpc.xds.EnvoyProtoData.TRANSPORT_SOCKET_NAME_TLS;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy;
import io.envoyproxy.envoy.config.core.v3.HttpProtocolOptions;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.config.route.v3.VirtualHost;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext;
import io.grpc.Status;
import io.grpc.internal.BackoffPolicy;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.EnvoyProtoData.StructOrError;
import io.grpc.xds.LoadStatsManager.LoadStatsStore;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/**
* XdsClient implementation for client side usages.
*/
final class ClientXdsClient extends AbstractXdsClient {
// Longest time to wait, since the subscription to some resource, for concluding its absence.
@VisibleForTesting
static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
private static final String TYPE_URL_HTTP_CONNECTION_MANAGER_V2 =
"type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2"
+ ".HttpConnectionManager";
private static final String TYPE_URL_HTTP_CONNECTION_MANAGER =
"type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3"
+ ".HttpConnectionManager";
private final Object lock = new Object();
private final Map<String, ResourceSubscriber> ldsResourceSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> rdsResourceSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> cdsResourceSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> edsResourceSubscribers = new HashMap<>();
private final LoadStatsManager loadStatsManager = new LoadStatsManager();
private final LoadReportClient lrsClient;
private boolean reportingLoad;
ClientXdsClient(XdsChannel channel, Node node, ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier) {
super(channel, node, timeService, backoffPolicyProvider, stopwatchSupplier);
lrsClient = new LoadReportClient(loadStatsManager, channel, node, timeService,
backoffPolicyProvider, stopwatchSupplier);
}
@Override
protected void handleLdsResponse(String versionInfo, List<Any> resources, String nonce) {
// Unpack Listener messages.
List<Listener> listeners = new ArrayList<>(resources.size());
List<String> listenerNames = new ArrayList<>(resources.size());
try {
for (com.google.protobuf.Any res : resources) {
if (res.getTypeUrl().equals(ResourceType.LDS.typeUrlV2())) {
res = res.toBuilder().setTypeUrl(ResourceType.LDS.typeUrl()).build();
}
Listener listener = res.unpack(Listener.class);
listeners.add(listener);
listenerNames.add(listener.getName());
}
} catch (InvalidProtocolBufferException e) {
getLogger().log(XdsLogLevel.WARNING, "Failed to unpack Listeners in LDS response {0}", e);
nackResponse(ResourceType.LDS, nonce, "Malformed LDS response: " + e);
return;
}
getLogger().log(XdsLogLevel.INFO, "Received LDS response for resources: {0}", listenerNames);
// Unpack HttpConnectionManager messages.
Map<String, HttpConnectionManager> httpConnectionManagers = new HashMap<>(listeners.size());
try {
for (Listener listener : listeners) {
Any apiListener = listener.getApiListener().getApiListener();
if (apiListener.getTypeUrl().equals(TYPE_URL_HTTP_CONNECTION_MANAGER_V2)) {
apiListener =
apiListener.toBuilder().setTypeUrl(TYPE_URL_HTTP_CONNECTION_MANAGER).build();
}
HttpConnectionManager hcm = apiListener.unpack(HttpConnectionManager.class);
httpConnectionManagers.put(listener.getName(), hcm);
}
} catch (InvalidProtocolBufferException e) {
getLogger().log(
XdsLogLevel.WARNING,
"Failed to unpack HttpConnectionManagers in Listeners of LDS response {0}", e);
nackResponse(ResourceType.LDS, nonce, "Malformed LDS response: " + e);
return;
}
Map<String, LdsUpdate> ldsUpdates = new HashMap<>();
Set<String> rdsNames = new HashSet<>();
String errorMessage = null;
for (Map.Entry<String, HttpConnectionManager> entry : httpConnectionManagers.entrySet()) {
String listenerName = entry.getKey();
HttpConnectionManager hcm = entry.getValue();
LdsUpdate.Builder updateBuilder = LdsUpdate.newBuilder();
if (hcm.hasRouteConfig()) {
for (VirtualHost virtualHostProto : hcm.getRouteConfig().getVirtualHostsList()) {
StructOrError<EnvoyProtoData.VirtualHost> virtualHost =
EnvoyProtoData.VirtualHost.fromEnvoyProtoVirtualHost(virtualHostProto);
if (virtualHost.getErrorDetail() != null) {
errorMessage = "Listener " + listenerName + " contains invalid virtual host: "
+ virtualHost.getErrorDetail();
break;
} else {
updateBuilder.addVirtualHost(virtualHost.getStruct());
}
}
} else if (hcm.hasRds()) {
Rds rds = hcm.getRds();
if (!rds.getConfigSource().hasAds()) {
errorMessage = "Listener " + listenerName + " with RDS config_source not set to ADS";
} else {
updateBuilder.setRdsName(rds.getRouteConfigName());
rdsNames.add(rds.getRouteConfigName());
}
} else {
errorMessage = "Listener " + listenerName + " without inline RouteConfiguration or RDS";
}
if (errorMessage != null) {
break;
}
if (hcm.hasCommonHttpProtocolOptions()) {
HttpProtocolOptions options = hcm.getCommonHttpProtocolOptions();
if (options.hasMaxStreamDuration()) {
updateBuilder.setHttpMaxStreamDurationNano(
Durations.toNanos(options.getMaxStreamDuration()));
}
}
ldsUpdates.put(listenerName, updateBuilder.build());
}
if (errorMessage != null) {
nackResponse(ResourceType.LDS, nonce, errorMessage);
return;
}
ackResponse(ResourceType.LDS, versionInfo, nonce);
for (String resource : ldsResourceSubscribers.keySet()) {
ResourceSubscriber subscriber = ldsResourceSubscribers.get(resource);
if (ldsUpdates.containsKey(resource)) {
subscriber.onData(ldsUpdates.get(resource));
} else {
subscriber.onAbsent();
}
}
for (String resource : rdsResourceSubscribers.keySet()) {
if (!rdsNames.contains(resource)) {
ResourceSubscriber subscriber = rdsResourceSubscribers.get(resource);
subscriber.onAbsent();
}
}
}
@Override
protected void handleRdsResponse(String versionInfo, List<Any> resources, String nonce) {
// Unpack RouteConfiguration messages.
Map<String, RouteConfiguration> routeConfigs = new HashMap<>(resources.size());
try {
for (com.google.protobuf.Any res : resources) {
if (res.getTypeUrl().equals(ResourceType.RDS.typeUrlV2())) {
res = res.toBuilder().setTypeUrl(ResourceType.RDS.typeUrl()).build();
}
RouteConfiguration rc = res.unpack(RouteConfiguration.class);
routeConfigs.put(rc.getName(), rc);
}
} catch (InvalidProtocolBufferException e) {
getLogger().log(
XdsLogLevel.WARNING, "Failed to unpack RouteConfiguration in RDS response {0}", e);
nackResponse(ResourceType.RDS, nonce, "Malformed RDS response: " + e);
return;
}
getLogger().log(
XdsLogLevel.INFO, "Received RDS response for resources: {0}", routeConfigs.keySet());
Map<String, RdsUpdate> rdsUpdates = new HashMap<>();
String errorMessage = null;
for (Map.Entry<String, RouteConfiguration> entry : routeConfigs.entrySet()) {
String routeConfigName = entry.getKey();
RouteConfiguration routeConfig = entry.getValue();
List<EnvoyProtoData.VirtualHost> virtualHosts =
new ArrayList<>(routeConfig.getVirtualHostsCount());
for (VirtualHost virtualHostProto : routeConfig.getVirtualHostsList()) {
StructOrError<EnvoyProtoData.VirtualHost> virtualHost =
EnvoyProtoData.VirtualHost.fromEnvoyProtoVirtualHost(virtualHostProto);
if (virtualHost.getErrorDetail() != null) {
errorMessage = "RouteConfiguration " + routeConfigName
+ " contains invalid virtual host: " + virtualHost.getErrorDetail();
break;
} else {
virtualHosts.add(virtualHost.getStruct());
}
}
if (errorMessage != null) {
break;
}
rdsUpdates.put(routeConfigName, RdsUpdate.fromVirtualHosts(virtualHosts));
}
if (errorMessage != null) {
nackResponse(ResourceType.RDS, nonce, errorMessage);
return;
}
ackResponse(ResourceType.RDS, versionInfo, nonce);
for (String resource : rdsResourceSubscribers.keySet()) {
if (rdsUpdates.containsKey(resource)) {
ResourceSubscriber subscriber = rdsResourceSubscribers.get(resource);
subscriber.onData(rdsUpdates.get(resource));
}
}
}
@Override
protected void handleCdsResponse(String versionInfo, List<Any> resources, String nonce) {
// Unpack Cluster messages.
List<Cluster> clusters = new ArrayList<>(resources.size());
List<String> clusterNames = new ArrayList<>(resources.size());
try {
for (com.google.protobuf.Any res : resources) {
if (res.getTypeUrl().equals(ResourceType.CDS.typeUrlV2())) {
res = res.toBuilder().setTypeUrl(ResourceType.CDS.typeUrl()).build();
}
Cluster cluster = res.unpack(Cluster.class);
clusters.add(cluster);
clusterNames.add(cluster.getName());
}
} catch (InvalidProtocolBufferException e) {
getLogger().log(XdsLogLevel.WARNING, "Failed to unpack Clusters in CDS response {0}", e);
nackResponse(ResourceType.CDS, nonce, "Malformed CDS response: " + e);
return;
}
getLogger().log(XdsLogLevel.INFO, "Received CDS response for resources: {0}", clusterNames);
String errorMessage = null;
// Cluster information update for requested clusters received in this CDS response.
Map<String, CdsUpdate> cdsUpdates = new HashMap<>();
// CDS responses represents the state of the world, EDS services not referenced by
// Clusters are those no longer exist.
Set<String> edsServices = new HashSet<>();
for (Cluster cluster : clusters) {
String clusterName = cluster.getName();
// Skip information for clusters not requested.
// Management server is required to always send newly requested resources, even if they
// may have been sent previously (proactively). Thus, client does not need to cache
// unrequested resources.
if (!cdsResourceSubscribers.containsKey(clusterName)) {
continue;
}
CdsUpdate.Builder updateBuilder = CdsUpdate.newBuilder();
updateBuilder.setClusterName(clusterName);
// The type field must be set to EDS.
if (!cluster.getType().equals(DiscoveryType.EDS)) {
errorMessage = "Cluster " + clusterName + " : only EDS discovery type is supported "
+ "in gRPC.";
break;
}
// In the eds_cluster_config field, the eds_config field must be set to indicate to
// use EDS (must be set to use ADS).
EdsClusterConfig edsClusterConfig = cluster.getEdsClusterConfig();
if (!edsClusterConfig.getEdsConfig().hasAds()) {
errorMessage = "Cluster " + clusterName + " : field eds_cluster_config must be set to "
+ "indicate to use EDS over ADS.";
break;
}
// If the service_name field is set, that value will be used for the EDS request.
if (!edsClusterConfig.getServiceName().isEmpty()) {
updateBuilder.setEdsServiceName(edsClusterConfig.getServiceName());
edsServices.add(edsClusterConfig.getServiceName());
} else {
edsServices.add(clusterName);
}
// The lb_policy field must be set to ROUND_ROBIN.
if (!cluster.getLbPolicy().equals(LbPolicy.ROUND_ROBIN)) {
errorMessage = "Cluster " + clusterName + " : only round robin load balancing policy is "
+ "supported in gRPC.";
break;
}
updateBuilder.setLbPolicy("round_robin");
// If the lrs_server field is set, it must have its self field set, in which case the
// client should use LRS for load reporting. Otherwise (the lrs_server field is not set),
// LRS load reporting will be disabled.
if (cluster.hasLrsServer()) {
if (!cluster.getLrsServer().hasSelf()) {
errorMessage = "Cluster " + clusterName + " : only support enabling LRS for the same "
+ "management server.";
break;
}
updateBuilder.setLrsServerName("");
}
try {
EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext =
getTlsContextFromCluster(cluster);
if (upstreamTlsContext != null && upstreamTlsContext.getCommonTlsContext() != null) {
updateBuilder.setUpstreamTlsContext(upstreamTlsContext);
}
} catch (InvalidProtocolBufferException e) {
errorMessage = "Cluster " + clusterName + " : " + e.getMessage();
break;
}
cdsUpdates.put(clusterName, updateBuilder.build());
}
if (errorMessage != null) {
nackResponse(ResourceType.CDS, nonce, errorMessage);
return;
}
ackResponse(ResourceType.CDS, versionInfo, nonce);
for (String resource : cdsResourceSubscribers.keySet()) {
ResourceSubscriber subscriber = cdsResourceSubscribers.get(resource);
if (cdsUpdates.containsKey(resource)) {
subscriber.onData(cdsUpdates.get(resource));
} else {
subscriber.onAbsent();
}
}
for (String resource : edsResourceSubscribers.keySet()) {
ResourceSubscriber subscriber = edsResourceSubscribers.get(resource);
if (!edsServices.contains(resource)) {
subscriber.onAbsent();
}
}
}
@Nullable
private static EnvoyServerProtoData.UpstreamTlsContext getTlsContextFromCluster(Cluster cluster)
throws InvalidProtocolBufferException {
if (cluster.hasTransportSocket()
&& TRANSPORT_SOCKET_NAME_TLS.equals(cluster.getTransportSocket().getName())) {
Any any = cluster.getTransportSocket().getTypedConfig();
return EnvoyServerProtoData.UpstreamTlsContext.fromEnvoyProtoUpstreamTlsContext(
any.unpack(UpstreamTlsContext.class));
}
return null;
}
@Override
protected void handleEdsResponse(String versionInfo, List<Any> resources, String nonce) {
// Unpack ClusterLoadAssignment messages.
List<ClusterLoadAssignment> clusterLoadAssignments = new ArrayList<>(resources.size());
List<String> claNames = new ArrayList<>(resources.size());
try {
for (com.google.protobuf.Any res : resources) {
if (res.getTypeUrl().equals(ResourceType.EDS.typeUrlV2())) {
res = res.toBuilder().setTypeUrl(ResourceType.EDS.typeUrl()).build();
}
ClusterLoadAssignment assignment = res.unpack(ClusterLoadAssignment.class);
clusterLoadAssignments.add(assignment);
claNames.add(assignment.getClusterName());
}
} catch (InvalidProtocolBufferException e) {
getLogger().log(
XdsLogLevel.WARNING, "Failed to unpack ClusterLoadAssignments in EDS response {0}", e);
nackResponse(ResourceType.EDS, nonce, "Malformed EDS response: " + e);
return;
}
getLogger().log(XdsLogLevel.INFO, "Received EDS response for resources: {0}", claNames);
String errorMessage = null;
// Endpoint information updates for requested clusters received in this EDS response.
Map<String, EdsUpdate> edsUpdates = new HashMap<>();
// Walk through each ClusterLoadAssignment message. If any of them for requested clusters
// contain invalid information for gRPC's load balancing usage, the whole response is rejected.
for (ClusterLoadAssignment assignment : clusterLoadAssignments) {
String clusterName = assignment.getClusterName();
// Skip information for clusters not requested.
// Management server is required to always send newly requested resources, even if they
// may have been sent previously (proactively). Thus, client does not need to cache
// unrequested resources.
if (!edsResourceSubscribers.containsKey(clusterName)) {
continue;
}
EdsUpdate.Builder updateBuilder = EdsUpdate.newBuilder();
updateBuilder.setClusterName(clusterName);
Set<Integer> priorities = new HashSet<>();
int maxPriority = -1;
for (io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints localityLbEndpoints
: assignment.getEndpointsList()) {
// Filter out localities without or with 0 weight.
if (!localityLbEndpoints.hasLoadBalancingWeight()
|| localityLbEndpoints.getLoadBalancingWeight().getValue() < 1) {
continue;
}
int localityPriority = localityLbEndpoints.getPriority();
if (localityPriority < 0) {
errorMessage =
"ClusterLoadAssignment " + clusterName + " : locality with negative priority.";
break;
}
maxPriority = Math.max(maxPriority, localityPriority);
priorities.add(localityPriority);
// The endpoint field of each lb_endpoints must be set.
// Inside of it: the address field must be set.
for (LbEndpoint lbEndpoint : localityLbEndpoints.getLbEndpointsList()) {
if (!lbEndpoint.getEndpoint().hasAddress()) {
errorMessage = "ClusterLoadAssignment " + clusterName + " : endpoint with no address.";
break;
}
}
if (errorMessage != null) {
break;
}
// Note endpoints with health status other than UNHEALTHY and UNKNOWN are still
// handed over to watching parties. It is watching parties' responsibility to
// filter out unhealthy endpoints. See EnvoyProtoData.LbEndpoint#isHealthy().
updateBuilder.addLocalityLbEndpoints(
Locality.fromEnvoyProtoLocality(localityLbEndpoints.getLocality()),
LocalityLbEndpoints.fromEnvoyProtoLocalityLbEndpoints(localityLbEndpoints));
}
if (errorMessage != null) {
break;
}
if (priorities.size() != maxPriority + 1) {
errorMessage = "ClusterLoadAssignment " + clusterName + " : sparse priorities.";
break;
}
for (ClusterLoadAssignment.Policy.DropOverload dropOverload
: assignment.getPolicy().getDropOverloadsList()) {
updateBuilder.addDropPolicy(DropOverload.fromEnvoyProtoDropOverload(dropOverload));
}
EdsUpdate update = updateBuilder.build();
edsUpdates.put(clusterName, update);
}
if (errorMessage != null) {
nackResponse(ResourceType.EDS, nonce, errorMessage);
return;
}
ackResponse(ResourceType.EDS, versionInfo, nonce);
for (String resource : edsResourceSubscribers.keySet()) {
ResourceSubscriber subscriber = edsResourceSubscribers.get(resource);
if (edsUpdates.containsKey(resource)) {
subscriber.onData(edsUpdates.get(resource));
}
}
}
@Override
protected void handleStreamClosed(Status error) {
cleanUpResourceTimers();
for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) {
subscriber.onError(error);
}
for (ResourceSubscriber subscriber : rdsResourceSubscribers.values()) {
subscriber.onError(error);
}
for (ResourceSubscriber subscriber : cdsResourceSubscribers.values()) {
subscriber.onError(error);
}
for (ResourceSubscriber subscriber : edsResourceSubscribers.values()) {
subscriber.onError(error);
}
}
@Override
protected void handleStreamRestarted() {
for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) {
subscriber.restartTimer();
}
for (ResourceSubscriber subscriber : rdsResourceSubscribers.values()) {
subscriber.restartTimer();
}
for (ResourceSubscriber subscriber : cdsResourceSubscribers.values()) {
subscriber.restartTimer();
}
for (ResourceSubscriber subscriber : edsResourceSubscribers.values()) {
subscriber.restartTimer();
}
}
@Override
protected void handleShutdown() {
if (reportingLoad) {
lrsClient.stopLoadReporting();
}
cleanUpResourceTimers();
}
@Nullable
@Override
Collection<String> getSubscribedResources(ResourceType type) {
switch (type) {
case LDS:
return ldsResourceSubscribers.isEmpty() ? null : ldsResourceSubscribers.keySet();
case RDS:
return rdsResourceSubscribers.isEmpty() ? null : rdsResourceSubscribers.keySet();
case CDS:
return cdsResourceSubscribers.isEmpty() ? null : cdsResourceSubscribers.keySet();
case EDS:
return edsResourceSubscribers.isEmpty() ? null : edsResourceSubscribers.keySet();
case UNKNOWN:
default:
throw new AssertionError("Unknown resource type");
}
}
@Override
void watchLdsResource(String resourceName, LdsResourceWatcher watcher) {
synchronized (lock) {
ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName);
if (subscriber == null) {
getLogger().log(XdsLogLevel.INFO, "Subscribe CDS resource {0}", resourceName);
subscriber = new ResourceSubscriber(ResourceType.LDS, resourceName);
ldsResourceSubscribers.put(resourceName, subscriber);
adjustResourceSubscription(ResourceType.LDS);
}
subscriber.addWatcher(watcher);
}
}
@Override
void cancelLdsResourceWatch(String resourceName, LdsResourceWatcher watcher) {
synchronized (lock) {
ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
getLogger().log(XdsLogLevel.INFO, "Unsubscribe LDS resource {0}", resourceName);
ldsResourceSubscribers.remove(resourceName);
adjustResourceSubscription(ResourceType.LDS);
}
}
}
@Override
void watchRdsResource(String resourceName, RdsResourceWatcher watcher) {
synchronized (lock) {
ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName);
if (subscriber == null) {
getLogger().log(XdsLogLevel.INFO, "Subscribe RDS resource {0}", resourceName);
subscriber = new ResourceSubscriber(ResourceType.RDS, resourceName);
rdsResourceSubscribers.put(resourceName, subscriber);
adjustResourceSubscription(ResourceType.RDS);
}
subscriber.addWatcher(watcher);
}
}
@Override
void cancelRdsResourceWatch(String resourceName, RdsResourceWatcher watcher) {
synchronized (lock) {
ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
getLogger().log(XdsLogLevel.INFO, "Unsubscribe RDS resource {0}", resourceName);
rdsResourceSubscribers.remove(resourceName);
adjustResourceSubscription(ResourceType.RDS);
}
}
}
@Override
void watchCdsResource(String resourceName, CdsResourceWatcher watcher) {
synchronized (lock) {
ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName);
if (subscriber == null) {
getLogger().log(XdsLogLevel.INFO, "Subscribe CDS resource {0}", resourceName);
subscriber = new ResourceSubscriber(ResourceType.CDS, resourceName);
cdsResourceSubscribers.put(resourceName, subscriber);
adjustResourceSubscription(ResourceType.CDS);
}
subscriber.addWatcher(watcher);
}
}
@Override
void cancelCdsResourceWatch(String resourceName, CdsResourceWatcher watcher) {
synchronized (lock) {
ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
getLogger().log(XdsLogLevel.INFO, "Unsubscribe CDS resource {0}", resourceName);
cdsResourceSubscribers.remove(resourceName);
adjustResourceSubscription(ResourceType.CDS);
}
}
}
@Override
void watchEdsResource(String resourceName, EdsResourceWatcher watcher) {
synchronized (lock) {
ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName);
if (subscriber == null) {
getLogger().log(XdsLogLevel.INFO, "Subscribe EDS resource {0}", resourceName);
subscriber = new ResourceSubscriber(ResourceType.EDS, resourceName);
edsResourceSubscribers.put(resourceName, subscriber);
adjustResourceSubscription(ResourceType.EDS);
}
subscriber.addWatcher(watcher);
}
}
@Override
void cancelEdsResourceWatch(String resourceName, EdsResourceWatcher watcher) {
synchronized (lock) {
ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
getLogger().log(XdsLogLevel.INFO, "Unsubscribe EDS resource {0}", resourceName);
edsResourceSubscribers.remove(resourceName);
adjustResourceSubscription(ResourceType.EDS);
}
}
}
@Override
LoadStatsStore addClientStats(String clusterName, @Nullable String clusterServiceName) {
synchronized (lock) {
LoadStatsStore loadStatsStore = loadStatsManager
.addLoadStats(clusterName, clusterServiceName);
if (!reportingLoad) {
lrsClient.startLoadReporting();
reportingLoad = true;
}
return loadStatsStore;
}
}
@Override
void removeClientStats(String clusterName, @Nullable String clusterServiceName) {
synchronized (lock) {
loadStatsManager.removeLoadStats(clusterName, clusterServiceName);
}
}
private void cleanUpResourceTimers() {
for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) {
subscriber.stopTimer();
}
for (ResourceSubscriber subscriber : rdsResourceSubscribers.values()) {
subscriber.stopTimer();
}
for (ResourceSubscriber subscriber : cdsResourceSubscribers.values()) {
subscriber.stopTimer();
}
for (ResourceSubscriber subscriber : edsResourceSubscribers.values()) {
subscriber.stopTimer();
}
}
@Override
protected void runWithSynchronized(Runnable runnable) {
synchronized (lock) {
runnable.run();
}
}
/**
* Tracks a single subscribed resource.
*/
private final class ResourceSubscriber {
private final ResourceType type;
private final String resource;
private final Set<ResourceWatcher> watchers = new HashSet<>();
private ResourceUpdate data;
private boolean absent;
private ScheduledFuture<?> respTimer;
ResourceSubscriber(ResourceType type, String resource) {
this.type = type;
this.resource = resource;
if (isInBackoff()) {
return;
}
restartTimer();
}
void addWatcher(ResourceWatcher watcher) {
checkArgument(!watchers.contains(watcher), "watcher %s already registered", watcher);
watchers.add(watcher);
if (data != null) {
notifyWatcher(watcher, data);
} else if (absent) {
watcher.onResourceDoesNotExist(resource);
}
}
void removeWatcher(ResourceWatcher watcher) {
checkArgument(watchers.contains(watcher), "watcher %s not registered", watcher);
watchers.remove(watcher);
}
// FIXME(chengyuanzhang): should only restart timer if the resource is still unresolved.
void restartTimer() {
class ResourceNotFound implements Runnable {
@Override
public void run() {
synchronized (lock) {
getLogger().log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout",
type, resource);
respTimer = null;
onAbsent();
}
}
@Override
public String toString() {
return type + this.getClass().getSimpleName();
}
}
respTimer = getTimeService().schedule(
new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
}
void stopTimer() {
if (respTimer != null && !respTimer.isDone()) {
respTimer.cancel(false);
respTimer = null;
}
}
boolean isWatched() {
return !watchers.isEmpty();
}
void onData(ResourceUpdate data) {
if (respTimer != null && !respTimer.isDone()) {
respTimer.cancel(false);
respTimer = null;
}
ResourceUpdate oldData = this.data;
this.data = data;
absent = false;
if (!Objects.equals(oldData, data)) {
for (ResourceWatcher watcher : watchers) {
notifyWatcher(watcher, data);
}
}
}
void onAbsent() {
if (respTimer != null && !respTimer.isDone()) { // too early to conclude absence
return;
}
getLogger().log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
if (!absent) {
data = null;
absent = true;
for (ResourceWatcher watcher : watchers) {
// TODO(chengyuanzhang): should invoke callback with watcher's own executor.
watcher.onResourceDoesNotExist(resource);
}
}
}
void onError(Status error) {
if (respTimer != null && !respTimer.isDone()) {
respTimer.cancel(false);
respTimer = null;
}
for (ResourceWatcher watcher : watchers) {
// TODO(chengyuanzhang): should invoke callback with watcher's own executor.
watcher.onError(error);
}
}
private void notifyWatcher(ResourceWatcher watcher, ResourceUpdate update) {
// TODO(chengyuanzhang): should invoke callbacks with watcher's own executor.
switch (type) {
case LDS:
((LdsResourceWatcher) watcher).onChanged((LdsUpdate) update);
break;
case RDS:
((RdsResourceWatcher) watcher).onChanged((RdsUpdate) update);
break;
case CDS:
((CdsResourceWatcher) watcher).onChanged((CdsUpdate) update);
break;
case EDS:
((EdsResourceWatcher) watcher).onChanged((EdsUpdate) update);
break;
case UNKNOWN:
default:
throw new AssertionError("should never be here");
}
}
}
}

View File

@ -0,0 +1,236 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import io.envoyproxy.envoy.config.core.v3.Address;
import io.envoyproxy.envoy.config.listener.v3.FilterChain;
import io.envoyproxy.envoy.config.listener.v3.FilterChainMatch;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/**
* XdsClient implementation for server side usages.
*/
final class ServerXdsClient extends AbstractXdsClient {
// Longest time to wait, since the subscription to some resource, for concluding its absence.
@VisibleForTesting
static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
private final SynchronizationContext syncContext;
@Nullable
private ListenerWatcher listenerWatcher;
private int listenerPort = -1;
@Nullable
private ScheduledHandle ldsRespTimer;
ServerXdsClient(XdsChannel channel, Node node, SynchronizationContext syncContext,
ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
super(channel, node, timeService, backoffPolicyProvider, stopwatchSupplier);
this.syncContext = checkNotNull(syncContext, "syncContext");
}
@Override
void watchListenerData(int port, ListenerWatcher watcher) {
checkState(listenerWatcher == null, "ListenerWatcher already registered");
listenerWatcher = checkNotNull(watcher, "watcher");
checkArgument(port > 0, "port needs to be > 0");
this.listenerPort = port;
getLogger().log(XdsLogLevel.INFO, "Started watching listener for port {0}", port);
updateNodeMetadataForListenerRequest(port);
adjustResourceSubscription(ResourceType.LDS);
if (!isInBackoff()) {
ldsRespTimer =
syncContext
.schedule(
new ListenerResourceFetchTimeoutTask(":" + port),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, getTimeService());
}
}
@Nullable
@Override
Collection<String> getSubscribedResources(ResourceType type) {
if (type != ResourceType.LDS || listenerWatcher == null) {
return null;
}
return Collections.emptyList();
}
/** In case of Listener watcher metadata to be updated to include port. */
private void updateNodeMetadataForListenerRequest(int port) {
Map<String, Object> newMetadata = new HashMap<>();
if (node.getMetadata() != null) {
newMetadata.putAll(node.getMetadata());
}
newMetadata.put("TRAFFICDIRECTOR_PROXYLESS", "1");
// TODO(sanjaypujare): eliminate usage of listening_addresses.
EnvoyProtoData.Address listeningAddress =
new EnvoyProtoData.Address("0.0.0.0", port);
node =
node.toBuilder().setMetadata(newMetadata).addListeningAddresses(listeningAddress).build();
}
@Override
protected void handleLdsResponse(String versionInfo, List<Any> resources, String nonce) {
// Unpack Listener messages.
Listener requestedListener = null;
getLogger().log(XdsLogLevel.DEBUG, "Listener count: {0}", resources.size());
try {
for (com.google.protobuf.Any res : resources) {
if (res.getTypeUrl().equals(ResourceType.LDS.typeUrlV2())) {
res = res.toBuilder().setTypeUrl(ResourceType.LDS.typeUrl()).build();
}
Listener listener = res.unpack(Listener.class);
getLogger().log(XdsLogLevel.DEBUG, "Found listener {0}", listener.toString());
if (isRequestedListener(listener)) {
requestedListener = listener;
getLogger().log(XdsLogLevel.DEBUG, "Requested listener found: {0}", listener.getName());
}
}
} catch (InvalidProtocolBufferException e) {
getLogger().log(XdsLogLevel.WARNING, "Failed to unpack Listeners in LDS response {0}", e);
nackResponse(ResourceType.LDS, nonce, "Malformed LDS response: " + e);
return;
}
ListenerUpdate listenerUpdate = null;
if (requestedListener != null) {
if (ldsRespTimer != null) {
ldsRespTimer.cancel();
ldsRespTimer = null;
}
try {
listenerUpdate = ListenerUpdate.newBuilder()
.setListener(EnvoyServerProtoData.Listener.fromEnvoyProtoListener(requestedListener))
.build();
} catch (InvalidProtocolBufferException e) {
getLogger().log(XdsLogLevel.WARNING, "Failed to unpack Listener in LDS response {0}", e);
nackResponse(ResourceType.LDS, nonce, "Malformed LDS response: " + e);
return;
}
} else {
if (ldsRespTimer == null) {
listenerWatcher.onResourceDoesNotExist(":" + listenerPort);
}
}
ackResponse(ResourceType.LDS, versionInfo, nonce);
if (listenerUpdate != null) {
listenerWatcher.onListenerChanged(listenerUpdate);
}
}
private boolean isRequestedListener(Listener listener) {
// TODO(sanjaypujare): check listener.getName() once we know what xDS server returns
return isAddressMatching(listener.getAddress())
&& hasMatchingFilter(listener.getFilterChainsList());
}
private boolean isAddressMatching(Address address) {
// TODO(sanjaypujare): check IP address once we know xDS server will include it
return address.hasSocketAddress()
&& (address.getSocketAddress().getPortValue() == listenerPort);
}
private boolean hasMatchingFilter(List<FilterChain> filterChainsList) {
// TODO(sanjaypujare): if myIp to be checked against filterChainMatch.getPrefixRangesList()
for (FilterChain filterChain : filterChainsList) {
FilterChainMatch filterChainMatch = filterChain.getFilterChainMatch();
if (listenerPort == filterChainMatch.getDestinationPort().getValue()) {
return true;
}
}
return false;
}
@Override
protected void handleStreamClosed(Status error) {
cleanUpResourceTimer();
if (listenerWatcher != null) {
listenerWatcher.onError(error);
}
}
@Override
protected void handleStreamRestarted() {
if (listenerWatcher != null) {
ldsRespTimer =
syncContext
.schedule(
new ListenerResourceFetchTimeoutTask(":" + listenerPort),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, getTimeService());
}
}
@Override
protected void handleShutdown() {
cleanUpResourceTimer();
}
@Override
protected void runWithSynchronized(Runnable runnable) {
syncContext.execute(runnable);
}
private void cleanUpResourceTimer() {
if (ldsRespTimer != null) {
ldsRespTimer.cancel();
ldsRespTimer = null;
}
}
@VisibleForTesting
final class ListenerResourceFetchTimeoutTask implements Runnable {
private String resourceName;
ListenerResourceFetchTimeoutTask(String resourceName) {
this.resourceName = resourceName;
}
@Override
public void run() {
getLogger().log(
XdsLogLevel.WARNING,
"Did not receive resource info {0} after {1} seconds, conclude it absent",
resourceName, INITIAL_RESOURCE_FETCH_TIMEOUT_SEC);
ldsRespTimer = null;
listenerWatcher.onResourceDoesNotExist(resourceName);
}
}
}

View File

@ -0,0 +1,276 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.xds.XdsClientTestHelper.buildCluster;
import static io.grpc.xds.XdsClientTestHelper.buildClusterLoadAssignment;
import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryResponse;
import static io.grpc.xds.XdsClientTestHelper.buildDropOverload;
import static io.grpc.xds.XdsClientTestHelper.buildLbEndpoint;
import static io.grpc.xds.XdsClientTestHelper.buildListener;
import static io.grpc.xds.XdsClientTestHelper.buildLocalityLbEndpoints;
import static io.grpc.xds.XdsClientTestHelper.buildRouteConfiguration;
import static io.grpc.xds.XdsClientTestHelper.buildVirtualHost;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Any;
import io.envoyproxy.envoy.config.core.v3.HealthStatus;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.xds.AbstractXdsClient.MessagePrinter;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import java.util.List;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Tests for {@link AbstractXdsClient}.
*/
@RunWith(JUnit4.class)
public class AbstractXdsClientTest {
@Test
public void messagePrinter_printLdsResponse() {
MessagePrinter printer = new MessagePrinter();
List<Any> listeners = ImmutableList.of(
Any.pack(buildListener("foo.googleapis.com:8080",
Any.pack(
HttpConnectionManager.newBuilder()
.setRouteConfig(
buildRouteConfiguration("route-foo.googleapis.com",
ImmutableList.of(
buildVirtualHost(
ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"),
"cluster.googleapis.com"))))
.build()))));
DiscoveryResponse response =
buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000");
String expectedString = "{\n"
+ " \"versionInfo\": \"0\",\n"
+ " \"resources\": [{\n"
+ " \"@type\": \"type.googleapis.com/envoy.config.listener.v3.Listener\",\n"
+ " \"name\": \"foo.googleapis.com:8080\",\n"
+ " \"address\": {\n"
+ " },\n"
+ " \"filterChains\": [{\n"
+ " }],\n"
+ " \"apiListener\": {\n"
+ " \"apiListener\": {\n"
+ " \"@type\": \"type.googleapis.com/envoy.extensions.filters.network"
+ ".http_connection_manager.v3.HttpConnectionManager\",\n"
+ " \"routeConfig\": {\n"
+ " \"name\": \"route-foo.googleapis.com\",\n"
+ " \"virtualHosts\": [{\n"
+ " \"name\": \"virtualhost00.googleapis.com\",\n"
+ " \"domains\": [\"foo.googleapis.com\", \"bar.googleapis.com\"],\n"
+ " \"routes\": [{\n"
+ " \"match\": {\n"
+ " \"prefix\": \"\"\n"
+ " },\n"
+ " \"route\": {\n"
+ " \"cluster\": \"cluster.googleapis.com\"\n"
+ " }\n"
+ " }]\n"
+ " }]\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " }],\n"
+ " \"typeUrl\": \"type.googleapis.com/envoy.config.listener.v3.Listener\",\n"
+ " \"nonce\": \"0000\"\n"
+ "}";
String res = printer.print(response);
assertThat(res).isEqualTo(expectedString);
}
@Test
public void messagePrinter_printRdsResponse() {
MessagePrinter printer = new MessagePrinter();
List<Any> routeConfigs =
ImmutableList.of(
Any.pack(
buildRouteConfiguration(
"route-foo.googleapis.com",
ImmutableList.of(
buildVirtualHost(
ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"),
"cluster.googleapis.com")))));
DiscoveryResponse response =
buildDiscoveryResponse("213", routeConfigs, ResourceType.RDS.typeUrl(), "0052");
String expectedString = "{\n"
+ " \"versionInfo\": \"213\",\n"
+ " \"resources\": [{\n"
+ " \"@type\": \"type.googleapis.com/envoy.config.route.v3.RouteConfiguration\",\n"
+ " \"name\": \"route-foo.googleapis.com\",\n"
+ " \"virtualHosts\": [{\n"
+ " \"name\": \"virtualhost00.googleapis.com\",\n"
+ " \"domains\": [\"foo.googleapis.com\", \"bar.googleapis.com\"],\n"
+ " \"routes\": [{\n"
+ " \"match\": {\n"
+ " \"prefix\": \"\"\n"
+ " },\n"
+ " \"route\": {\n"
+ " \"cluster\": \"cluster.googleapis.com\"\n"
+ " }\n"
+ " }]\n"
+ " }]\n"
+ " }],\n"
+ " \"typeUrl\": \"type.googleapis.com/envoy.config.route.v3.RouteConfiguration\",\n"
+ " \"nonce\": \"0052\"\n"
+ "}";
String res = printer.print(response);
assertThat(res).isEqualTo(expectedString);
}
@Test
public void messagePrinter_printCdsResponse() {
MessagePrinter printer = new MessagePrinter();
List<Any> clusters = ImmutableList.of(
Any.pack(buildCluster("cluster-bar.googleapis.com", "service-blaze:cluster-bar", true)),
Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)));
DiscoveryResponse response =
buildDiscoveryResponse("14", clusters, ResourceType.CDS.typeUrl(), "8");
String expectedString = "{\n"
+ " \"versionInfo\": \"14\",\n"
+ " \"resources\": [{\n"
+ " \"@type\": \"type.googleapis.com/envoy.config.cluster.v3.Cluster\",\n"
+ " \"name\": \"cluster-bar.googleapis.com\",\n"
+ " \"type\": \"EDS\",\n"
+ " \"edsClusterConfig\": {\n"
+ " \"edsConfig\": {\n"
+ " \"ads\": {\n"
+ " }\n"
+ " },\n"
+ " \"serviceName\": \"service-blaze:cluster-bar\"\n"
+ " },\n"
+ " \"lrsServer\": {\n"
+ " \"self\": {\n"
+ " }\n"
+ " }\n"
+ " }, {\n"
+ " \"@type\": \"type.googleapis.com/envoy.config.cluster.v3.Cluster\",\n"
+ " \"name\": \"cluster-foo.googleapis.com\",\n"
+ " \"type\": \"EDS\",\n"
+ " \"edsClusterConfig\": {\n"
+ " \"edsConfig\": {\n"
+ " \"ads\": {\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " }],\n"
+ " \"typeUrl\": \"type.googleapis.com/envoy.config.cluster.v3.Cluster\",\n"
+ " \"nonce\": \"8\"\n"
+ "}";
String res = printer.print(response);
assertThat(res).isEqualTo(expectedString);
}
@Test
public void messagePrinter_printEdsResponse() {
MessagePrinter printer = new MessagePrinter();
List<Any> clusterLoadAssignments = ImmutableList.of(
Any.pack(buildClusterLoadAssignment("cluster-foo.googleapis.com",
ImmutableList.of(
buildLocalityLbEndpoints("region1", "zone1", "subzone1",
ImmutableList.of(
buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)),
1, 0),
buildLocalityLbEndpoints("region3", "zone3", "subzone3",
ImmutableList.of(
buildLbEndpoint("192.168.142.5", 80, HealthStatus.UNHEALTHY, 5)),
2, 1)),
ImmutableList.of(
buildDropOverload("lb", 200),
buildDropOverload("throttle", 1000)))));
DiscoveryResponse response =
buildDiscoveryResponse("5", clusterLoadAssignments,
ResourceType.EDS.typeUrl(), "004");
String expectedString = "{\n"
+ " \"versionInfo\": \"5\",\n"
+ " \"resources\": [{\n"
+ " \"@type\": \"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment\",\n"
+ " \"clusterName\": \"cluster-foo.googleapis.com\",\n"
+ " \"endpoints\": [{\n"
+ " \"locality\": {\n"
+ " \"region\": \"region1\",\n"
+ " \"zone\": \"zone1\",\n"
+ " \"subZone\": \"subzone1\"\n"
+ " },\n"
+ " \"lbEndpoints\": [{\n"
+ " \"endpoint\": {\n"
+ " \"address\": {\n"
+ " \"socketAddress\": {\n"
+ " \"address\": \"192.168.0.1\",\n"
+ " \"portValue\": 8080\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"healthStatus\": \"HEALTHY\",\n"
+ " \"loadBalancingWeight\": 2\n"
+ " }],\n"
+ " \"loadBalancingWeight\": 1\n"
+ " }, {\n"
+ " \"locality\": {\n"
+ " \"region\": \"region3\",\n"
+ " \"zone\": \"zone3\",\n"
+ " \"subZone\": \"subzone3\"\n"
+ " },\n"
+ " \"lbEndpoints\": [{\n"
+ " \"endpoint\": {\n"
+ " \"address\": {\n"
+ " \"socketAddress\": {\n"
+ " \"address\": \"192.168.142.5\",\n"
+ " \"portValue\": 80\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"healthStatus\": \"UNHEALTHY\",\n"
+ " \"loadBalancingWeight\": 5\n"
+ " }],\n"
+ " \"loadBalancingWeight\": 2,\n"
+ " \"priority\": 1\n"
+ " }],\n"
+ " \"policy\": {\n"
+ " \"dropOverloads\": [{\n"
+ " \"category\": \"lb\",\n"
+ " \"dropPercentage\": {\n"
+ " \"numerator\": 200,\n"
+ " \"denominator\": \"MILLION\"\n"
+ " }\n"
+ " }, {\n"
+ " \"category\": \"throttle\",\n"
+ " \"dropPercentage\": {\n"
+ " \"numerator\": 1000,\n"
+ " \"denominator\": \"MILLION\"\n"
+ " }\n"
+ " }]\n"
+ " }\n"
+ " }],\n"
+ " \"typeUrl\": \"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment\",\n"
+ " \"nonce\": \"004\"\n"
+ "}";
String res = printer.print(response);
assertThat(res).isEqualTo(expectedString);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,819 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryResponseV2;
import static io.grpc.xds.XdsClientTestHelper.buildListenerV2;
import static io.grpc.xds.XdsClientTestHelper.buildRouteConfigurationV2;
import static io.grpc.xds.XdsClientTestHelper.buildVirtualHostV2;
import static org.junit.Assert.fail;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.UInt32Value;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
import io.envoyproxy.envoy.api.v2.Listener;
import io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext;
import io.envoyproxy.envoy.api.v2.core.CidrRange;
import io.envoyproxy.envoy.api.v2.core.SocketAddress;
import io.envoyproxy.envoy.api.v2.core.TransportSocket;
import io.envoyproxy.envoy.api.v2.listener.Filter;
import io.envoyproxy.envoy.api.v2.listener.FilterChain;
import io.envoyproxy.envoy.api.v2.listener.FilterChainMatch;
import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
import io.grpc.Context;
import io.grpc.Context.CancellationListener;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.internal.FakeClock.TaskFilter;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import io.grpc.xds.EnvoyProtoData.Address;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.ListenerUpdate;
import io.grpc.xds.XdsClient.ListenerWatcher;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
/**
* Tests for {@link ServerXdsClient}.
*/
@RunWith(JUnit4.class)
public class ServerXdsClientTest {
private static final int PORT = 7000;
private static final String LOCAL_IP = "192.168.3.5";
private static final String DIFFERENT_IP = "192.168.3.6";
private static final String TYPE_URL_HCM =
"type.googleapis.com/"
+ "envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager";
private static final Node NODE = Node.newBuilder().build();
private static final FakeClock.TaskFilter RPC_RETRY_TASK_FILTER =
new FakeClock.TaskFilter() {
@Override
public boolean shouldAccept(Runnable command) {
return command.toString().contains(AbstractXdsClient.RpcRetryTask.class.getSimpleName());
}
};
private static final TaskFilter LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER =
new TaskFilter() {
@Override
public boolean shouldAccept(Runnable command) {
return command.toString()
.contains(ServerXdsClient.ListenerResourceFetchTimeoutTask.class.getSimpleName());
}
};
private static final String LISTENER_NAME = "INBOUND_LISTENER";
@Rule
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw new AssertionError(e);
}
});
private final FakeClock fakeClock = new FakeClock();
private final Queue<StreamObserver<DiscoveryResponse>> responseObservers = new ArrayDeque<>();
private final Queue<StreamObserver<DiscoveryRequest>> requestObservers = new ArrayDeque<>();
private final AtomicBoolean callEnded = new AtomicBoolean(true);
@Mock
private AggregatedDiscoveryServiceImplBase mockedDiscoveryService;
@Mock
private BackoffPolicy.Provider backoffPolicyProvider;
@Mock
private BackoffPolicy backoffPolicy1;
@Mock
private BackoffPolicy backoffPolicy2;
@Mock
private ListenerWatcher listenerWatcher;
private ManagedChannel channel;
private ServerXdsClient xdsClient;
@Before
public void setUp() throws IOException {
MockitoAnnotations.initMocks(this);
when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L);
when(backoffPolicy2.nextBackoffNanos()).thenReturn(20L, 200L);
final String serverName = InProcessServerBuilder.generateName();
AggregatedDiscoveryServiceImplBase adsServiceImpl = new AggregatedDiscoveryServiceImplBase() {
@Override
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
final StreamObserver<DiscoveryResponse> responseObserver) {
assertThat(callEnded.get()).isTrue(); // ensure previous call was ended
callEnded.set(false);
Context.current().addListener(
new CancellationListener() {
@Override
public void cancelled(Context context) {
callEnded.set(true);
}
}, MoreExecutors.directExecutor());
responseObservers.offer(responseObserver);
@SuppressWarnings("unchecked")
StreamObserver<DiscoveryRequest> requestObserver = mock(StreamObserver.class);
requestObservers.offer(requestObserver);
return requestObserver;
}
};
mockedDiscoveryService =
mock(AggregatedDiscoveryServiceImplBase.class, delegatesTo(adsServiceImpl));
cleanupRule.register(
InProcessServerBuilder
.forName(serverName)
.addService(mockedDiscoveryService)
.directExecutor()
.build()
.start());
channel =
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
xdsClient =
new ServerXdsClient(new XdsChannel(channel, /* useProtocolV3= */ false), NODE,
syncContext, fakeClock.getScheduledExecutorService(), backoffPolicyProvider,
fakeClock.getStopwatchSupplier());
// Only the connection to management server is established, no RPC request is sent until at
// least one watcher is registered.
assertThat(responseObservers).isEmpty();
assertThat(requestObservers).isEmpty();
}
@After
public void tearDown() {
xdsClient.shutdown();
assertThat(callEnded.get()).isTrue();
assertThat(channel.isShutdown()).isTrue();
assertThat(fakeClock.getPendingTasks()).isEmpty();
}
private static Node getNodeToVerify() {
Map<String, Object> newMetadata = new HashMap<>();
if (NODE.getMetadata() != null) {
newMetadata.putAll(NODE.getMetadata());
}
newMetadata.put("TRAFFICDIRECTOR_PROXYLESS", "1");
Address listeningAddress = new Address("0.0.0.0", PORT);
return NODE.toBuilder()
.setMetadata(newMetadata)
.addListeningAddresses(listeningAddress)
.build();
}
private static DiscoveryRequest buildDiscoveryRequest(
Node node, String versionInfo, String typeUrl, String nonce) {
return DiscoveryRequest.newBuilder()
.setVersionInfo(versionInfo)
.setNode(node.toEnvoyProtoNodeV2())
.setTypeUrl(typeUrl)
.setResponseNonce(nonce)
.build();
}
/** Error when 2 ListenerWatchers registered. */
@Test
public void ldsResponse_2listenerWatchers_expectError() {
xdsClient.watchListenerData(PORT, listenerWatcher);
try {
xdsClient.watchListenerData(80, listenerWatcher);
fail("expected exception");
} catch (IllegalStateException expected) {
assertThat(expected)
.hasMessageThat()
.isEqualTo("ListenerWatcher already registered");
}
}
/**
* Client receives an LDS response that contains listener with no match i.e. no port match.
*/
@Test
public void ldsResponse_nonMatchingFilterChain_notFoundError() {
xdsClient.watchListenerData(PORT, listenerWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
// Client sends an LDS request with null in lds resource name
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
ResourceType.LDS.typeUrlV2(), "")));
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
List<Any> listeners = ImmutableList.of(
Any.pack(buildListenerV2("bar.googleapis.com",
Any.pack(HttpConnectionManager.newBuilder()
.setRouteConfig(
buildRouteConfigurationV2("route-bar.googleapis.com",
ImmutableList.of(
buildVirtualHostV2(
ImmutableList.of("bar.googleapis.com"),
"cluster-bar.googleapis.com"))))
.build()))),
Any.pack(buildListenerV2(LISTENER_NAME,
Any.pack(HttpConnectionManager.newBuilder()
.setRouteConfig(
buildRouteConfigurationV2("route-baz.googleapis.com",
ImmutableList.of(
buildVirtualHostV2(
ImmutableList.of("baz.googleapis.com"),
"cluster-baz.googleapis.com"))))
.build()))));
DiscoveryResponse response =
buildDiscoveryResponseV2("0", listeners, ResourceType.LDS.typeUrlV2(), "0000");
responseObserver.onNext(response);
// Client sends an ACK LDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "0000")));
verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class));
verify(listenerWatcher, never()).onResourceDoesNotExist(":" + PORT);
verify(listenerWatcher, never()).onError(any(Status.class));
fakeClock.forwardTime(ServerXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
verify(listenerWatcher).onResourceDoesNotExist(":" + PORT);
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
}
/** Client receives a Listener with listener address and mismatched port. */
@Test
public void ldsResponseWith_listenerAddressPortMismatch() {
xdsClient.watchListenerData(PORT, listenerWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
// Client sends an LDS request with null in lds resource name
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
ResourceType.LDS.typeUrlV2(), "")));
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
.setPrefixLen(UInt32Value.of(32)).build()),
// Server is still speaking xds v2.
CommonTlsContextTestsUtil.buildTestDownstreamTlsContextV2("google-sds-config-default",
"ROOTCA"),
buildTestFilter("envoy.http_connection_manager"));
List<Any> listeners = ImmutableList.of(
Any.pack(buildListenerV2("bar.googleapis.com",
Any.pack(HttpConnectionManager.newBuilder()
.setRouteConfig(
buildRouteConfigurationV2("route-bar.googleapis.com",
ImmutableList.of(
buildVirtualHostV2(
ImmutableList.of("bar.googleapis.com"),
"cluster-bar.googleapis.com"))))
.build()))),
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0",
filterChainOutbound,
filterChainInbound
)));
DiscoveryResponse response =
buildDiscoveryResponseV2("0", listeners, ResourceType.LDS.typeUrlV2(), "0000");
responseObserver.onNext(response);
// Client sends an ACK LDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "0000")));
verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class));
verify(listenerWatcher, never()).onResourceDoesNotExist(":" + PORT);
verify(listenerWatcher, never()).onError(any(Status.class));
fakeClock.forwardTime(ServerXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
verify(listenerWatcher).onResourceDoesNotExist(":" + PORT);
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
}
/** Client receives a Listener with all match. */
@Test
public void ldsResponseWith_matchingListenerFound() throws InvalidProtocolBufferException {
xdsClient.watchListenerData(PORT, listenerWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
// Client sends an LDS request with null in lds resource name
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
ResourceType.LDS.typeUrlV2(), "")));
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
.setPrefixLen(UInt32Value.of(32)).build()),
// Server is still speaking xds v2.
CommonTlsContextTestsUtil.buildTestDownstreamTlsContextV2("google-sds-config-default",
"ROOTCA"),
buildTestFilter("envoy.http_connection_manager"));
List<Any> listeners = ImmutableList.of(
Any.pack(buildListenerV2("bar.googleapis.com",
Any.pack(HttpConnectionManager.newBuilder()
.setRouteConfig(
buildRouteConfigurationV2("route-bar.googleapis.com",
ImmutableList.of(
buildVirtualHostV2(
ImmutableList.of("bar.googleapis.com"),
"cluster-bar.googleapis.com"))))
.build()))),
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0",
filterChainOutbound,
filterChainInbound
)));
DiscoveryResponse response =
buildDiscoveryResponseV2("0", listeners, ResourceType.LDS.typeUrlV2(), "0000");
responseObserver.onNext(response);
// Client sends an ACK LDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "0000")));
ArgumentCaptor<ListenerUpdate> listenerUpdateCaptor = ArgumentCaptor.forClass(null);
verify(listenerWatcher, times(1)).onListenerChanged(listenerUpdateCaptor.capture());
ListenerUpdate configUpdate = listenerUpdateCaptor.getValue();
EnvoyServerProtoData.Listener listener = configUpdate.getListener();
assertThat(listener.getName()).isEqualTo(LISTENER_NAME);
assertThat(listener.getAddress()).isEqualTo("0.0.0.0:" + PORT);
assertThat(listener.getFilterChains()).hasSize(2);
EnvoyServerProtoData.FilterChain filterChainOutboundInListenerUpdate
= listener.getFilterChains().get(0);
assertThat(filterChainOutboundInListenerUpdate.getFilterChainMatch().getDestinationPort())
.isEqualTo(8000);
EnvoyServerProtoData.FilterChain filterChainInboundInListenerUpdate
= listener.getFilterChains().get(1);
EnvoyServerProtoData.FilterChainMatch inBoundfilterChainMatch =
filterChainInboundInListenerUpdate.getFilterChainMatch();
assertThat(inBoundfilterChainMatch.getDestinationPort()).isEqualTo(PORT);
assertThat(inBoundfilterChainMatch.getPrefixRanges()).containsExactly(
new EnvoyServerProtoData.CidrRange(LOCAL_IP, 32));
CommonTlsContext downstreamCommonTlsContext =
filterChainInboundInListenerUpdate.getDownstreamTlsContext().getCommonTlsContext();
assertThat(downstreamCommonTlsContext.getTlsCertificateSdsSecretConfigs(0).getName())
.isEqualTo("google-sds-config-default");
assertThat(
downstreamCommonTlsContext
.getCombinedValidationContext()
.getValidationContextSdsSecretConfig()
.getName())
.isEqualTo("ROOTCA");
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
}
/** Client receives LDS responses for updating Listener previously received. */
@Test
public void notifyUpdatedListener() throws InvalidProtocolBufferException {
xdsClient.watchListenerData(PORT, listenerWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
// Client sends an LDS request with null in lds resource name
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
ResourceType.LDS.typeUrlV2(), "")));
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
.setPrefixLen(UInt32Value.of(32)).build()),
// Server is still speaking xds v2.
CommonTlsContextTestsUtil.buildTestDownstreamTlsContextV2("google-sds-config-default",
"ROOTCA"),
buildTestFilter("envoy.http_connection_manager"));
List<Any> listeners = ImmutableList.of(
Any.pack(buildListenerV2("bar.googleapis.com",
Any.pack(HttpConnectionManager.newBuilder()
.setRouteConfig(
buildRouteConfigurationV2("route-bar.googleapis.com",
ImmutableList.of(
buildVirtualHostV2(
ImmutableList.of("bar.googleapis.com"),
"cluster-bar.googleapis.com"))))
.build()))),
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0",
filterChainOutbound,
filterChainInbound
)));
DiscoveryResponse response =
buildDiscoveryResponseV2("0", listeners, ResourceType.LDS.typeUrlV2(), "0000");
responseObserver.onNext(response);
// Client sends an ACK LDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "0000")));
ArgumentCaptor<ListenerUpdate> listenerUpdateCaptor = ArgumentCaptor.forClass(null);
verify(listenerWatcher, times(1)).onListenerChanged(listenerUpdateCaptor.capture());
// Management sends back another LDS response containing updates for the requested Listener.
final FilterChain filterChainNewInbound = buildFilterChain(buildFilterChainMatch(PORT,
CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
.setPrefixLen(UInt32Value.of(32)).build()),
CommonTlsContextTestsUtil.buildTestDownstreamTlsContextV2("google-sds-config-default1",
"ROOTCA2"),
buildTestFilter("envoy.http_connection_manager"));
List<Any> listeners1 = ImmutableList.of(
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0",
filterChainNewInbound
)));
DiscoveryResponse response1 =
buildDiscoveryResponseV2("1", listeners1, ResourceType.LDS.typeUrlV2(), "0001");
responseObserver.onNext(response1);
// Client sends an ACK LDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1",
ResourceType.LDS.typeUrlV2(), "0001")));
// Updated listener is notified to config watcher.
listenerUpdateCaptor = ArgumentCaptor.forClass(null);
verify(listenerWatcher, times(2)).onListenerChanged(listenerUpdateCaptor.capture());
ListenerUpdate configUpdate = listenerUpdateCaptor.getValue();
EnvoyServerProtoData.Listener listener = configUpdate.getListener();
assertThat(listener.getName()).isEqualTo(LISTENER_NAME);
assertThat(listener.getFilterChains()).hasSize(1);
EnvoyServerProtoData.FilterChain filterChain =
Iterables.getOnlyElement(listener.getFilterChains());
EnvoyServerProtoData.FilterChainMatch filterChainMatch = filterChain.getFilterChainMatch();
assertThat(filterChainMatch.getDestinationPort()).isEqualTo(PORT);
assertThat(filterChainMatch.getPrefixRanges()).containsExactly(
new EnvoyServerProtoData.CidrRange(LOCAL_IP, 32));
CommonTlsContext downstreamCommonTlsContext =
filterChain.getDownstreamTlsContext().getCommonTlsContext();
assertThat(downstreamCommonTlsContext.getTlsCertificateSdsSecretConfigs(0).getName())
.isEqualTo("google-sds-config-default1");
assertThat(
downstreamCommonTlsContext
.getCombinedValidationContext()
.getValidationContextSdsSecretConfig()
.getName())
.isEqualTo("ROOTCA2");
}
/**
* Client receives LDS response containing matching name but non-matching IP address. Test
* disabled until IP matching logic implemented.
*/
@Ignore
@Test
public void ldsResponse_nonMatchingIpAddress() {
xdsClient.watchListenerData(PORT, listenerWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
// Client sends an LDS request with null in lds resource name
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
ResourceType.LDS.typeUrlV2(), "")));
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(8000), null);
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(PORT,
CidrRange.newBuilder().setAddressPrefix(DIFFERENT_IP)
.setPrefixLen(UInt32Value.of(32)).build()),
// Server is still speaking xds v2.
CommonTlsContextTestsUtil.buildTestDownstreamTlsContextV2("google-sds-config-default",
"ROOTCA"),
buildTestFilter("envoy.http_connection_manager"));
List<Any> listeners = ImmutableList.of(
Any.pack(buildListenerV2("bar.googleapis.com",
Any.pack(HttpConnectionManager.newBuilder()
.setRouteConfig(
buildRouteConfigurationV2("route-bar.googleapis.com",
ImmutableList.of(
buildVirtualHostV2(
ImmutableList.of("bar.googleapis.com"),
"cluster-bar.googleapis.com"))))
.build()))),
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0",
filterChainInbound,
filterChainOutbound
)));
DiscoveryResponse response =
buildDiscoveryResponseV2("0", listeners, ResourceType.LDS.typeUrlV2(), "0000");
responseObserver.onNext(response);
// Client sends an ACK LDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "0000")));
verify(listenerWatcher, never()).onError(any(Status.class));
verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class));
}
/** Client receives LDS response containing non-matching port in the filterMatch. */
@Test
public void ldsResponse_nonMatchingPort() {
xdsClient.watchListenerData(PORT, listenerWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
// Client sends an LDS request with null in lds resource name
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
ResourceType.LDS.typeUrlV2(), "")));
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(8000), null);
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(
PORT + 1, // add 1 to mismatch
CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
.setPrefixLen(UInt32Value.of(32)).build()),
// Server is still speaking xds v2.
CommonTlsContextTestsUtil.buildTestDownstreamTlsContextV2("google-sds-config-default",
"ROOTCA"),
buildTestFilter("envoy.http_connection_manager"));
List<Any> listeners = ImmutableList.of(
Any.pack(buildListenerV2("bar.googleapis.com",
Any.pack(HttpConnectionManager.newBuilder()
.setRouteConfig(
buildRouteConfigurationV2("route-bar.googleapis.com",
ImmutableList.of(
buildVirtualHostV2(
ImmutableList.of("bar.googleapis.com"),
"cluster-bar.googleapis.com"))))
.build()))),
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0",
filterChainInbound,
filterChainOutbound
)));
DiscoveryResponse response =
buildDiscoveryResponseV2("0", listeners, ResourceType.LDS.typeUrlV2(), "0000");
responseObserver.onNext(response);
// Client sends an ACK LDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "0000")));
verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class));
verify(listenerWatcher, never()).onResourceDoesNotExist(":" + PORT);
verify(listenerWatcher, never()).onError(any(Status.class));
fakeClock.forwardTime(ServerXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
verify(listenerWatcher).onResourceDoesNotExist(":" + PORT);
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
}
/**
* RPC stream close and retry while there is listener watcher registered.
*/
@Test
public void streamClosedAndRetry() {
InOrder inOrder =
Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
backoffPolicy2);
xdsClient.watchListenerData(PORT, listenerWatcher);
ArgumentCaptor<StreamObserver<DiscoveryResponse>> responseObserverCaptor =
ArgumentCaptor.forClass(null);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
StreamObserver<DiscoveryResponse> responseObserver =
responseObserverCaptor.getValue(); // same as responseObservers.poll()
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
ResourceType.LDS.typeUrlV2(), "")));
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
.setPrefixLen(UInt32Value.of(32)).build()),
// Server is still speaking xds v2.
CommonTlsContextTestsUtil.buildTestDownstreamTlsContextV2("google-sds-config-default",
"ROOTCA"),
buildTestFilter("envoy.http_connection_manager"));
List<Any> listeners = ImmutableList.of(
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0",
filterChainOutbound,
filterChainInbound
)));
DiscoveryResponse response =
buildDiscoveryResponseV2("0", listeners, ResourceType.LDS.typeUrlV2(), "0000");
responseObserver.onNext(response);
// Client sent an ACK CDS request (Omitted).
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(null);
// Management server closes the RPC stream with an error.
responseObserver.onError(Status.UNKNOWN.asException());
verify(listenerWatcher).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN);
// Resets backoff and retry immediately.
inOrder.verify(backoffPolicyProvider).get();
fakeClock.runDueTasks();
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
// Retry resumes requests for all wanted resources.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "")));
// Management server becomes unreachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
verify(listenerWatcher, times(2)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
// Retry after backoff.
fakeClock.forwardNanos(9L);
assertThat(requestObservers).isEmpty();
fakeClock.forwardNanos(1L);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "")));
// Management server is still not reachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
verify(listenerWatcher, times(3)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
// Retry after backoff.
fakeClock.forwardNanos(99L);
assertThat(requestObservers).isEmpty();
fakeClock.forwardNanos(1L);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "")));
// Management server sends back a LDS response.
response = buildDiscoveryResponseV2("1", listeners,
ResourceType.LDS.typeUrlV2(), "0001");
responseObserver.onNext(response);
// Client sent an LDS ACK request (Omitted).
// Management server closes the RPC stream.
responseObserver.onCompleted();
verify(listenerWatcher, times(4)).onError(any(Status.class));
// Resets backoff and retry immediately
inOrder.verify(backoffPolicyProvider).get();
fakeClock.runDueTasks();
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1",
ResourceType.LDS.typeUrlV2(), "")));
// Management server becomes unreachable again.
responseObserver.onError(Status.UNAVAILABLE.asException());
verify(listenerWatcher, times(5)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
inOrder.verify(backoffPolicy2).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
// Retry after backoff.
fakeClock.forwardNanos(19L);
assertThat(requestObservers).isEmpty();
fakeClock.forwardNanos(1L);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1",
ResourceType.LDS.typeUrlV2(), "")));
verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
backoffPolicy2);
}
static Listener buildListenerWithFilterChain(String name, int portValue, String address,
FilterChain... filterChains) {
io.envoyproxy.envoy.api.v2.core.Address listenerAddress =
io.envoyproxy.envoy.api.v2.core.Address.newBuilder()
.setSocketAddress(
SocketAddress.newBuilder().setPortValue(portValue).setAddress(address))
.build();
return
Listener.newBuilder()
.setName(name)
.setAddress(listenerAddress)
.addAllFilterChains(Arrays.asList(filterChains))
.build();
}
@SuppressWarnings("deprecation")
static FilterChain buildFilterChain(FilterChainMatch filterChainMatch,
DownstreamTlsContext tlsContext, Filter...filters) {
return FilterChain.newBuilder()
.setFilterChainMatch(filterChainMatch)
.setTransportSocket(
tlsContext == null
? TransportSocket.getDefaultInstance()
: TransportSocket.newBuilder()
.setName("envoy.transport_sockets.tls")
.setTypedConfig(Any.pack(tlsContext))
.build())
.addAllFilters(Arrays.asList(filters))
.build();
}
static FilterChainMatch buildFilterChainMatch(int destPort, CidrRange...prefixRanges) {
return
FilterChainMatch.newBuilder()
.setDestinationPort(UInt32Value.of(destPort))
.addAllPrefixRanges(Arrays.asList(prefixRanges))
.build();
}
static Filter buildTestFilter(String name) {
return
Filter.newBuilder()
.setName(name)
.setTypedConfig(
Any.newBuilder()
.setTypeUrl(TYPE_URL_HCM))
.build();
}
}