xds: refactor XdsClient in preparation to support federation (#8630)

See go/java-xds-client-api-for-federation for detailed description
This commit is contained in:
ZHANG Dapeng 2021-11-01 09:44:58 -07:00 committed by GitHub
parent 14eb3b265f
commit a46560e4fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 363 additions and 236 deletions

View File

@ -28,6 +28,7 @@ import com.google.rpc.Code;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.InternalLogId;
import io.grpc.ManagedChannel;
@ -36,6 +37,11 @@ import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.ClientXdsClient.XdsChannelFactory;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.ResourceStore;
import io.grpc.xds.XdsClient.XdsResponseHandler;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.util.Collection;
import java.util.Collections;
@ -48,7 +54,7 @@ import javax.annotation.Nullable;
* Common base type for XdsClient implementations, which encapsulates the layer abstraction of
* the xDS RPC stream.
*/
abstract class AbstractXdsClient extends XdsClient {
final class AbstractXdsClient {
private static final String ADS_TYPE_URL_LDS_V2 = "type.googleapis.com/envoy.api.v2.Listener";
private static final String ADS_TYPE_URL_LDS =
@ -66,26 +72,18 @@ abstract class AbstractXdsClient extends XdsClient {
private static final String ADS_TYPE_URL_EDS =
"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
getLogger().log(
XdsLogLevel.ERROR,
"Uncaught exception in XdsClient SynchronizationContext. Panic!",
e);
// TODO(chengyuanzhang): better error handling.
throw new AssertionError(e);
}
});
private final SynchronizationContext syncContext;
private final InternalLogId logId;
private final XdsLogger logger;
private final ServerInfo serverInfo;
private final ManagedChannel channel;
private final XdsResponseHandler xdsResponseHandler;
private final ResourceStore resourceStore;
private final Context context;
private final ScheduledExecutorService timeService;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Stopwatch stopwatch;
private final Bootstrapper.BootstrapInfo bootstrapInfo;
private final Node bootstrapNode;
// Last successfully applied version_info for each resource type. Starts with empty string.
// A version_info is used to update management server with client's most recent knowledge of
@ -103,71 +101,42 @@ abstract class AbstractXdsClient extends XdsClient {
@Nullable
private ScheduledHandle rpcRetryTimer;
AbstractXdsClient(ManagedChannel channel, Bootstrapper.BootstrapInfo bootstrapInfo,
Context context, ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier) {
this.channel = checkNotNull(channel, "channel");
this.bootstrapInfo = checkNotNull(bootstrapInfo, "bootstrapInfo");
/** An entity that manages ADS RPCs over a single channel. */
// TODO: rename to XdsChannel
AbstractXdsClient(
XdsChannelFactory xdsChannelFactory,
ServerInfo serverInfo,
Node bootstrapNode,
XdsResponseHandler xdsResponseHandler,
ResourceStore resourceStore,
Context context,
ScheduledExecutorService
timeService,
SynchronizationContext syncContext,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
this.serverInfo = checkNotNull(serverInfo, "serverInfo");
this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo);
this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber");
this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode");
this.context = checkNotNull(context, "context");
this.timeService = checkNotNull(timeService, "timeService");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
logId = InternalLogId.allocate("xds-client", null);
logId = InternalLogId.allocate("xds-client", serverInfo.target());
logger = XdsLogger.withLogId(logId);
logger.log(XdsLogLevel.INFO, "Created");
}
/**
* Called when an LDS response is received.
*/
// Must be synchronized.
protected void handleLdsResponse(String versionInfo, List<Any> resources, String nonce) {
/** The underlying channel. */
// Currently, only externally used for LrsClient.
Channel channel() {
return channel;
}
/**
* Called when a RDS response is received.
*/
// Must be synchronized.
protected void handleRdsResponse(String versionInfo, List<Any> resources, String nonce) {
}
/**
* Called when a CDS response is received.
*/
// Must be synchronized.
protected void handleCdsResponse(String versionInfo, List<Any> resources, String nonce) {
}
/**
* Called when an EDS response is received.
*/
// Must be synchronized.
protected void handleEdsResponse(String versionInfo, List<Any> resources, String nonce) {
}
/**
* Called when the ADS stream is closed passively.
*/
// Must be synchronized.
protected void handleStreamClosed(Status error) {
}
/**
* Called when the ADS stream has been recreated.
*/
// Must be synchronized.
protected void handleStreamRestarted() {
}
/**
* Called when being shut down.
*/
// Must be synchronized.
protected void handleShutdown() {
}
@Override
final void shutdown() {
void shutdown() {
syncContext.execute(new Runnable() {
@Override
public void run() {
@ -179,49 +148,28 @@ abstract class AbstractXdsClient extends XdsClient {
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
rpcRetryTimer.cancel();
}
handleShutdown();
channel.shutdown();
}
});
}
@Override
boolean isShutDown() {
return shutdown;
}
@Override
Bootstrapper.BootstrapInfo getBootstrapInfo() {
return bootstrapInfo;
}
@Override
public String toString() {
return logId.toString();
}
/**
* Returns the collection of resources currently subscribing to or {@code null} if not
* subscribing to any resources for the given type.
*
* <p>Note an empty collection indicates subscribing to resources of the given type with
* wildcard mode.
*/
// Must be synchronized.
@Nullable
abstract Collection<String> getSubscribedResources(ResourceType type);
/**
* Updates the resource subscription for the given resource type.
*/
// Must be synchronized.
protected final void adjustResourceSubscription(ResourceType type) {
void adjustResourceSubscription(ResourceType type) {
if (isInBackoff()) {
return;
}
if (adsStream == null) {
startRpcStream();
}
Collection<String> resources = getSubscribedResources(type);
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
if (resources != null) {
adsStream.sendDiscoveryRequest(type, resources);
}
@ -232,7 +180,7 @@ abstract class AbstractXdsClient extends XdsClient {
* and sends an ACK request to the management server.
*/
// Must be synchronized.
protected final void ackResponse(ResourceType type, String versionInfo, String nonce) {
void ackResponse(ResourceType type, String versionInfo, String nonce) {
switch (type) {
case LDS:
ldsVersion = versionInfo;
@ -252,7 +200,7 @@ abstract class AbstractXdsClient extends XdsClient {
}
logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
type, nonce, versionInfo);
Collection<String> resources = getSubscribedResources(type);
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
if (resources == null) {
resources = Collections.emptyList();
}
@ -264,34 +212,22 @@ abstract class AbstractXdsClient extends XdsClient {
* accepted version) to the management server.
*/
// Must be synchronized.
protected final void nackResponse(ResourceType type, String nonce, String errorDetail) {
void nackResponse(ResourceType type, String nonce, String errorDetail) {
String versionInfo = getCurrentVersion(type);
logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
type, nonce, versionInfo);
Collection<String> resources = getSubscribedResources(type);
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
if (resources == null) {
resources = Collections.emptyList();
}
adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail);
}
protected final SynchronizationContext getSyncContext() {
return syncContext;
}
protected final ScheduledExecutorService getTimeService() {
return timeService;
}
protected final XdsLogger getLogger() {
return logger;
}
/**
* Returns {@code true} if the resource discovery is currently in backoff.
*/
// Must be synchronized.
protected final boolean isInBackoff() {
boolean isInBackoff() {
return rpcRetryTimer != null && rpcRetryTimer.isPending();
}
@ -302,7 +238,7 @@ abstract class AbstractXdsClient extends XdsClient {
// Must be synchronized.
private void startRpcStream() {
checkState(adsStream == null, "Previous adsStream has not been cleared yet");
if (bootstrapInfo.servers().get(0).useProtocolV3()) {
if (serverInfo.useProtocolV3()) {
adsStream = new AdsStreamV3();
} else {
adsStream = new AdsStreamV2();
@ -317,8 +253,8 @@ abstract class AbstractXdsClient extends XdsClient {
stopwatch.reset().start();
}
/** Returns the latest accepted version of the given resource type. */
// Must be synchronized.
@Override
String getCurrentVersion(ResourceType type) {
String version;
switch (type) {
@ -353,16 +289,16 @@ abstract class AbstractXdsClient extends XdsClient {
if (type == ResourceType.UNKNOWN) {
continue;
}
Collection<String> resources = getSubscribedResources(type);
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
if (resources != null) {
adsStream.sendDiscoveryRequest(type, resources);
}
}
handleStreamRestarted();
xdsResponseHandler.handleStreamRestarted(serverInfo);
}
}
protected enum ResourceType {
enum ResourceType {
UNKNOWN, LDS, RDS, CDS, EDS;
String typeUrl() {
@ -488,19 +424,19 @@ abstract class AbstractXdsClient extends XdsClient {
switch (type) {
case LDS:
ldsRespNonce = nonce;
handleLdsResponse(versionInfo, resources, nonce);
xdsResponseHandler.handleLdsResponse(serverInfo, versionInfo, resources, nonce);
break;
case RDS:
rdsRespNonce = nonce;
handleRdsResponse(versionInfo, resources, nonce);
xdsResponseHandler.handleRdsResponse(serverInfo, versionInfo, resources, nonce);
break;
case CDS:
cdsRespNonce = nonce;
handleCdsResponse(versionInfo, resources, nonce);
xdsResponseHandler.handleCdsResponse(serverInfo, versionInfo, resources, nonce);
break;
case EDS:
edsRespNonce = nonce;
handleEdsResponse(versionInfo, resources, nonce);
xdsResponseHandler.handleEdsResponse(serverInfo, versionInfo, resources, nonce);
break;
case UNKNOWN:
default:
@ -526,7 +462,7 @@ abstract class AbstractXdsClient extends XdsClient {
"ADS stream closed with status {0}: {1}. Cause: {2}",
error.getCode(), error.getDescription(), error.getCause());
closed = true;
handleStreamClosed(error);
xdsResponseHandler.handleStreamClosed(error);
cleanUp();
if (responseReceived || retryBackoffPolicy == null) {
// Reset the backoff sequence if had received a response, or backoff sequence
@ -619,7 +555,7 @@ abstract class AbstractXdsClient extends XdsClient {
io.envoyproxy.envoy.api.v2.DiscoveryRequest.Builder builder =
io.envoyproxy.envoy.api.v2.DiscoveryRequest.newBuilder()
.setVersionInfo(versionInfo)
.setNode(bootstrapInfo.node().toEnvoyProtoNodeV2())
.setNode(bootstrapNode.toEnvoyProtoNodeV2())
.addAllResourceNames(resources)
.setTypeUrl(type.typeUrlV2())
.setResponseNonce(nonce);
@ -699,7 +635,7 @@ abstract class AbstractXdsClient extends XdsClient {
DiscoveryRequest.Builder builder =
DiscoveryRequest.newBuilder()
.setVersionInfo(versionInfo)
.setNode(bootstrapInfo.node().toEnvoyProtoNode())
.setNode(bootstrapNode.toEnvoyProtoNode())
.addAllResourceNames(resources)
.setTypeUrl(type.typeUrl())
.setResponseNonce(nonce);

View File

@ -27,6 +27,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.Any;
import com.google.protobuf.Duration;
import com.google.protobuf.InvalidProtocolBufferException;
@ -56,14 +57,20 @@ import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext;
import io.envoyproxy.envoy.type.v3.FractionalPercent;
import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType;
import io.grpc.ChannelCredentials;
import io.grpc.Context;
import io.grpc.EquivalentAddressGroup;
import io.grpc.Grpc;
import io.grpc.InternalLogId;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.TimeProvider;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.Endpoints.LbEndpoint;
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
@ -85,6 +92,8 @@ import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy;
import io.grpc.xds.VirtualHost.Route.RouteMatch;
import io.grpc.xds.VirtualHost.Route.RouteMatch.PathMatcher;
import io.grpc.xds.XdsClient.ResourceStore;
import io.grpc.xds.XdsClient.XdsResponseHandler;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.internal.Matchers.FractionMatcher;
import io.grpc.xds.internal.Matchers.HeaderMatcher;
@ -111,7 +120,7 @@ import javax.annotation.Nullable;
/**
* XdsClient implementation for client side usages.
*/
final class ClientXdsClient extends AbstractXdsClient {
final class ClientXdsClient extends XdsClient implements XdsResponseHandler, ResourceStore {
// Longest time to wait, since the subscription to some resource, for concluding its absence.
@VisibleForTesting
@ -164,33 +173,90 @@ final class ClientXdsClient extends AbstractXdsClient {
Code.CANCELLED, Code.DEADLINE_EXCEEDED, Code.INTERNAL, Code.RESOURCE_EXHAUSTED,
Code.UNAVAILABLE));
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.log(
XdsLogLevel.ERROR,
"Uncaught exception in XdsClient SynchronizationContext. Panic!",
e);
// TODO(chengyuanzhang): better error handling.
throw new AssertionError(e);
}
});
private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry();
private final Map<ServerInfo, AbstractXdsClient> serverChannelMap = new HashMap<>();
private final Map<String, ResourceSubscriber> ldsResourceSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> rdsResourceSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> cdsResourceSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> edsResourceSubscribers = new HashMap<>();
private final LoadStatsManager2 loadStatsManager;
private final LoadReportClient lrsClient;
private final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>();
private final XdsChannelFactory xdsChannelFactory;
private final Bootstrapper.BootstrapInfo bootstrapInfo;
private final Context context;
private final ScheduledExecutorService timeService;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Supplier<Stopwatch> stopwatchSupplier;
private final TimeProvider timeProvider;
private boolean reportingLoad;
private final TlsContextManager tlsContextManager;
private final InternalLogId logId;
private final XdsLogger logger;
private volatile boolean isShutdown;
// TODO(zdapeng): rename to XdsClientImpl
ClientXdsClient(
ManagedChannel channel, Bootstrapper.BootstrapInfo bootstrapInfo, Context context,
ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier, TimeProvider timeProvider,
XdsChannelFactory xdsChannelFactory,
Bootstrapper.BootstrapInfo bootstrapInfo,
Context context,
ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier,
TimeProvider timeProvider,
TlsContextManager tlsContextManager) {
super(channel, bootstrapInfo, context, timeService, backoffPolicyProvider, stopwatchSupplier);
this.xdsChannelFactory = xdsChannelFactory;
this.bootstrapInfo = bootstrapInfo;
this.context = context;
this.timeService = timeService;
loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
this.backoffPolicyProvider = backoffPolicyProvider;
this.stopwatchSupplier = stopwatchSupplier;
this.timeProvider = timeProvider;
this.tlsContextManager = checkNotNull(tlsContextManager, "tlsContextManager");
lrsClient = new LoadReportClient(loadStatsManager, channel, context,
bootstrapInfo.servers().get(0).useProtocolV3(), bootstrapInfo.node(),
getSyncContext(), timeService, backoffPolicyProvider, stopwatchSupplier);
logId = InternalLogId.allocate("xds-client", null);
logger = XdsLogger.withLogId(logId);
logger.log(XdsLogLevel.INFO, "Created");
}
private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) {
syncContext.throwIfNotInThisSynchronizationContext();
if (serverChannelMap.containsKey(serverInfo)) {
return;
}
AbstractXdsClient xdsChannel = new AbstractXdsClient(
xdsChannelFactory,
serverInfo,
bootstrapInfo.node(),
this,
this,
context,
timeService,
syncContext,
backoffPolicyProvider,
stopwatchSupplier);
LoadReportClient lrsClient = new LoadReportClient(
loadStatsManager, xdsChannel.channel(), context, serverInfo.useProtocolV3(),
bootstrapInfo.node(), syncContext, timeService, backoffPolicyProvider, stopwatchSupplier);
serverChannelMap.put(serverInfo, xdsChannel);
serverLrsClientMap.put(serverInfo, lrsClient);
}
@Override
protected void handleLdsResponse(String versionInfo, List<Any> resources, String nonce) {
public void handleLdsResponse(
ServerInfo serverInfo, String versionInfo, List<Any> resources, String nonce) {
syncContext.throwIfNotInThisSynchronizationContext();
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
@ -233,12 +299,12 @@ final class ClientXdsClient extends AbstractXdsClient {
// LdsUpdate parsed successfully.
parsedResources.put(listenerName, new ParsedResource(ldsUpdate, resource));
}
getLogger().log(XdsLogLevel.INFO,
logger.log(XdsLogLevel.INFO,
"Received LDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);
handleResourceUpdate(
ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources, versionInfo,
nonce, errors);
serverInfo, ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources,
versionInfo, nonce, errors);
}
private LdsUpdate processClientSideListener(
@ -1307,7 +1373,9 @@ final class ClientXdsClient extends AbstractXdsClient {
}
@Override
protected void handleRdsResponse(String versionInfo, List<Any> resources, String nonce) {
public void handleRdsResponse(
ServerInfo serverInfo, String versionInfo, List<Any> resources, String nonce) {
syncContext.throwIfNotInThisSynchronizationContext();
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
@ -1344,12 +1412,12 @@ final class ClientXdsClient extends AbstractXdsClient {
parsedResources.put(routeConfigName, new ParsedResource(rdsUpdate, resource));
}
getLogger().log(XdsLogLevel.INFO,
logger.log(XdsLogLevel.INFO,
"Received RDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);
handleResourceUpdate(
ResourceType.RDS, parsedResources, invalidResources, Collections.<String>emptySet(),
versionInfo, nonce, errors);
serverInfo, ResourceType.RDS, parsedResources, invalidResources,
Collections.<String>emptySet(), versionInfo, nonce, errors);
}
private static RdsUpdate processRouteConfiguration(
@ -1370,7 +1438,9 @@ final class ClientXdsClient extends AbstractXdsClient {
}
@Override
protected void handleCdsResponse(String versionInfo, List<Any> resources, String nonce) {
public void handleCdsResponse(
ServerInfo serverInfo, String versionInfo, List<Any> resources, String nonce) {
syncContext.throwIfNotInThisSynchronizationContext();
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
@ -1415,12 +1485,12 @@ final class ClientXdsClient extends AbstractXdsClient {
}
parsedResources.put(clusterName, new ParsedResource(cdsUpdate, resource));
}
getLogger().log(XdsLogLevel.INFO,
logger.log(XdsLogLevel.INFO,
"Received CDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);
handleResourceUpdate(
ResourceType.CDS, parsedResources, invalidResources, retainedEdsResources, versionInfo,
nonce, errors);
serverInfo, ResourceType.CDS, parsedResources, invalidResources, retainedEdsResources,
versionInfo, nonce, errors);
}
@VisibleForTesting
@ -1598,7 +1668,9 @@ final class ClientXdsClient extends AbstractXdsClient {
}
@Override
protected void handleEdsResponse(String versionInfo, List<Any> resources, String nonce) {
public void handleEdsResponse(
ServerInfo serverInfo, String versionInfo, List<Any> resources, String nonce) {
syncContext.throwIfNotInThisSynchronizationContext();
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
@ -1641,12 +1713,12 @@ final class ClientXdsClient extends AbstractXdsClient {
}
parsedResources.put(clusterName, new ParsedResource(edsUpdate, resource));
}
getLogger().log(
logger.log(
XdsLogLevel.INFO, "Received EDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);
handleResourceUpdate(
ResourceType.EDS, parsedResources, invalidResources, Collections.<String>emptySet(),
versionInfo, nonce, errors);
serverInfo, ResourceType.EDS, parsedResources, invalidResources,
Collections.<String>emptySet(), versionInfo, nonce, errors);
}
private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment)
@ -1775,7 +1847,8 @@ final class ClientXdsClient extends AbstractXdsClient {
}
@Override
protected void handleStreamClosed(Status error) {
public void handleStreamClosed(Status error) {
syncContext.throwIfNotInThisSynchronizationContext();
cleanUpResourceTimers();
for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) {
subscriber.onError(error);
@ -1792,28 +1865,57 @@ final class ClientXdsClient extends AbstractXdsClient {
}
@Override
protected void handleStreamRestarted() {
public void handleStreamRestarted(ServerInfo serverInfo) {
syncContext.throwIfNotInThisSynchronizationContext();
for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) {
if (subscriber.serverInfo.equals(serverInfo)) {
subscriber.restartTimer();
}
}
for (ResourceSubscriber subscriber : rdsResourceSubscribers.values()) {
if (subscriber.serverInfo.equals(serverInfo)) {
subscriber.restartTimer();
}
}
for (ResourceSubscriber subscriber : cdsResourceSubscribers.values()) {
if (subscriber.serverInfo.equals(serverInfo)) {
subscriber.restartTimer();
}
}
for (ResourceSubscriber subscriber : edsResourceSubscribers.values()) {
if (subscriber.serverInfo.equals(serverInfo)) {
subscriber.restartTimer();
}
}
}
@Override
protected void handleShutdown() {
void shutdown() {
syncContext.execute(
new Runnable() {
@Override
public void run() {
if (isShutdown) {
return;
}
isShutdown = true;
for (AbstractXdsClient xdsChannel : serverChannelMap.values()) {
xdsChannel.shutdown();
}
if (reportingLoad) {
for (final LoadReportClient lrsClient : serverLrsClientMap.values()) {
lrsClient.stopLoadReporting();
}
}
cleanUpResourceTimers();
}
});
}
@Override
boolean isShutDown() {
return isShutdown;
}
private Map<String, ResourceSubscriber> getSubscribedResourcesMap(ResourceType type) {
switch (type) {
@ -1833,9 +1935,16 @@ final class ClientXdsClient extends AbstractXdsClient {
@Nullable
@Override
Collection<String> getSubscribedResources(ResourceType type) {
public Collection<String> getSubscribedResources(ServerInfo serverInfo, ResourceType type) {
Map<String, ResourceSubscriber> resources = getSubscribedResourcesMap(type);
return resources.isEmpty() ? null : resources.keySet();
ImmutableSet.Builder<String> builder = ImmutableSet.builder();
for (String key : resources.keySet()) {
if (resources.get(key).serverInfo.equals(serverInfo)) {
builder.add(key);
}
}
Collection<String> retVal = builder.build();
return retVal.isEmpty() ? null : retVal;
}
@Override
@ -1854,15 +1963,15 @@ final class ClientXdsClient extends AbstractXdsClient {
@Override
void watchLdsResource(final String resourceName, final LdsResourceWatcher watcher) {
getSyncContext().execute(new Runnable() {
syncContext.execute(new Runnable() {
@Override
public void run() {
ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName);
if (subscriber == null) {
getLogger().log(XdsLogLevel.INFO, "Subscribe LDS resource {0}", resourceName);
logger.log(XdsLogLevel.INFO, "Subscribe LDS resource {0}", resourceName);
subscriber = new ResourceSubscriber(ResourceType.LDS, resourceName);
ldsResourceSubscribers.put(resourceName, subscriber);
adjustResourceSubscription(ResourceType.LDS);
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.LDS);
}
subscriber.addWatcher(watcher);
}
@ -1871,16 +1980,16 @@ final class ClientXdsClient extends AbstractXdsClient {
@Override
void cancelLdsResourceWatch(final String resourceName, final LdsResourceWatcher watcher) {
getSyncContext().execute(new Runnable() {
syncContext.execute(new Runnable() {
@Override
public void run() {
ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
getLogger().log(XdsLogLevel.INFO, "Unsubscribe LDS resource {0}", resourceName);
logger.log(XdsLogLevel.INFO, "Unsubscribe LDS resource {0}", resourceName);
ldsResourceSubscribers.remove(resourceName);
adjustResourceSubscription(ResourceType.LDS);
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.LDS);
}
}
});
@ -1888,15 +1997,15 @@ final class ClientXdsClient extends AbstractXdsClient {
@Override
void watchRdsResource(final String resourceName, final RdsResourceWatcher watcher) {
getSyncContext().execute(new Runnable() {
syncContext.execute(new Runnable() {
@Override
public void run() {
ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName);
if (subscriber == null) {
getLogger().log(XdsLogLevel.INFO, "Subscribe RDS resource {0}", resourceName);
logger.log(XdsLogLevel.INFO, "Subscribe RDS resource {0}", resourceName);
subscriber = new ResourceSubscriber(ResourceType.RDS, resourceName);
rdsResourceSubscribers.put(resourceName, subscriber);
adjustResourceSubscription(ResourceType.RDS);
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.RDS);
}
subscriber.addWatcher(watcher);
}
@ -1905,16 +2014,16 @@ final class ClientXdsClient extends AbstractXdsClient {
@Override
void cancelRdsResourceWatch(final String resourceName, final RdsResourceWatcher watcher) {
getSyncContext().execute(new Runnable() {
syncContext.execute(new Runnable() {
@Override
public void run() {
ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
getLogger().log(XdsLogLevel.INFO, "Unsubscribe RDS resource {0}", resourceName);
logger.log(XdsLogLevel.INFO, "Unsubscribe RDS resource {0}", resourceName);
rdsResourceSubscribers.remove(resourceName);
adjustResourceSubscription(ResourceType.RDS);
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.RDS);
}
}
});
@ -1922,15 +2031,15 @@ final class ClientXdsClient extends AbstractXdsClient {
@Override
void watchCdsResource(final String resourceName, final CdsResourceWatcher watcher) {
getSyncContext().execute(new Runnable() {
syncContext.execute(new Runnable() {
@Override
public void run() {
ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName);
if (subscriber == null) {
getLogger().log(XdsLogLevel.INFO, "Subscribe CDS resource {0}", resourceName);
logger.log(XdsLogLevel.INFO, "Subscribe CDS resource {0}", resourceName);
subscriber = new ResourceSubscriber(ResourceType.CDS, resourceName);
cdsResourceSubscribers.put(resourceName, subscriber);
adjustResourceSubscription(ResourceType.CDS);
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.CDS);
}
subscriber.addWatcher(watcher);
}
@ -1939,16 +2048,16 @@ final class ClientXdsClient extends AbstractXdsClient {
@Override
void cancelCdsResourceWatch(final String resourceName, final CdsResourceWatcher watcher) {
getSyncContext().execute(new Runnable() {
syncContext.execute(new Runnable() {
@Override
public void run() {
ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
getLogger().log(XdsLogLevel.INFO, "Unsubscribe CDS resource {0}", resourceName);
logger.log(XdsLogLevel.INFO, "Unsubscribe CDS resource {0}", resourceName);
cdsResourceSubscribers.remove(resourceName);
adjustResourceSubscription(ResourceType.CDS);
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.CDS);
}
}
});
@ -1956,15 +2065,15 @@ final class ClientXdsClient extends AbstractXdsClient {
@Override
void watchEdsResource(final String resourceName, final EdsResourceWatcher watcher) {
getSyncContext().execute(new Runnable() {
syncContext.execute(new Runnable() {
@Override
public void run() {
ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName);
if (subscriber == null) {
getLogger().log(XdsLogLevel.INFO, "Subscribe EDS resource {0}", resourceName);
logger.log(XdsLogLevel.INFO, "Subscribe EDS resource {0}", resourceName);
subscriber = new ResourceSubscriber(ResourceType.EDS, resourceName);
edsResourceSubscribers.put(resourceName, subscriber);
adjustResourceSubscription(ResourceType.EDS);
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.EDS);
}
subscriber.addWatcher(watcher);
}
@ -1973,30 +2082,32 @@ final class ClientXdsClient extends AbstractXdsClient {
@Override
void cancelEdsResourceWatch(final String resourceName, final EdsResourceWatcher watcher) {
getSyncContext().execute(new Runnable() {
syncContext.execute(new Runnable() {
@Override
public void run() {
ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
getLogger().log(XdsLogLevel.INFO, "Unsubscribe EDS resource {0}", resourceName);
logger.log(XdsLogLevel.INFO, "Unsubscribe EDS resource {0}", resourceName);
edsResourceSubscribers.remove(resourceName);
adjustResourceSubscription(ResourceType.EDS);
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.EDS);
}
}
});
}
@Override
ClusterDropStats addClusterDropStats(String clusterName, @Nullable String edsServiceName) {
ClusterDropStats addClusterDropStats(
String clusterName, @Nullable String edsServiceName) {
ClusterDropStats dropCounter =
loadStatsManager.getClusterDropStats(clusterName, edsServiceName);
getSyncContext().execute(new Runnable() {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!reportingLoad) {
lrsClient.startLoadReporting();
// TODO(https://github.com/grpc/grpc-java/issues/8628): consume ServerInfo arg.
serverLrsClientMap.values().iterator().next().startLoadReporting();
reportingLoad = true;
}
}
@ -2005,15 +2116,17 @@ final class ClientXdsClient extends AbstractXdsClient {
}
@Override
ClusterLocalityStats addClusterLocalityStats(String clusterName,
@Nullable String edsServiceName, Locality locality) {
ClusterLocalityStats addClusterLocalityStats(
String clusterName, @Nullable String edsServiceName,
Locality locality) {
ClusterLocalityStats loadCounter =
loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality);
getSyncContext().execute(new Runnable() {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!reportingLoad) {
lrsClient.startLoadReporting();
// TODO(https://github.com/grpc/grpc-java/issues/8628): consume ServerInfo arg.
serverLrsClientMap.values().iterator().next().startLoadReporting();
reportingLoad = true;
}
}
@ -2021,6 +2134,25 @@ final class ClientXdsClient extends AbstractXdsClient {
return loadCounter;
}
@Override
Bootstrapper.BootstrapInfo getBootstrapInfo() {
return bootstrapInfo;
}
// TODO(https://github.com/grpc/grpc-java/issues/8629): remove this
@Override
String getCurrentVersion(ResourceType type) {
if (serverChannelMap.isEmpty()) {
return "";
}
return serverChannelMap.values().iterator().next().getCurrentVersion(type);
}
@Override
public String toString() {
return logId.toString();
}
private void cleanUpResourceTimers() {
for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) {
subscriber.stopTimer();
@ -2037,18 +2169,19 @@ final class ClientXdsClient extends AbstractXdsClient {
}
private void handleResourceUpdate(
ResourceType type, Map<String, ParsedResource> parsedResources, Set<String> invalidResources,
Set<String> retainedResources, String version, String nonce, List<String> errors) {
ServerInfo serverInfo, ResourceType type, Map<String, ParsedResource> parsedResources,
Set<String> invalidResources, Set<String> retainedResources, String version, String nonce,
List<String> errors) {
String errorDetail = null;
if (errors.isEmpty()) {
checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
ackResponse(type, version, nonce);
serverChannelMap.get(serverInfo).ackResponse(type, version, nonce);
} else {
errorDetail = Joiner.on('\n').join(errors);
getLogger().log(XdsLogLevel.WARNING,
logger.log(XdsLogLevel.WARNING,
"Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
type, version, nonce, errorDetail);
nackResponse(type, nonce, errorDetail);
serverChannelMap.get(serverInfo).nackResponse(type, nonce, errorDetail);
}
long updateTime = timeProvider.currentTimeNanos();
for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) {
@ -2123,6 +2256,8 @@ final class ClientXdsClient extends AbstractXdsClient {
* Tracks a single subscribed resource.
*/
private final class ResourceSubscriber {
private final ServerInfo serverInfo;
private final AbstractXdsClient xdsChannel;
private final ResourceType type;
private final String resource;
private final Set<ResourceWatcher> watchers = new HashSet<>();
@ -2132,17 +2267,26 @@ final class ClientXdsClient extends AbstractXdsClient {
private ResourceMetadata metadata;
ResourceSubscriber(ResourceType type, String resource) {
syncContext.throwIfNotInThisSynchronizationContext();
this.type = type;
this.resource = resource;
this.serverInfo = getServerInfo();
// Initialize metadata in UNKNOWN state to cover the case when resource subscriber,
// is created but not yet requested because the client is in backoff.
this.metadata = ResourceMetadata.newResourceMetadataUnknown();
if (isInBackoff()) {
maybeCreateXdsChannelWithLrs(serverInfo);
this.xdsChannel = serverChannelMap.get(serverInfo);
if (xdsChannel.isInBackoff()) {
return;
}
restartTimer();
}
// TODO(zdapeng): add resourceName arg and support xdstp:// resources
private ServerInfo getServerInfo() {
return bootstrapInfo.servers().get(0); // use first server
}
void addWatcher(ResourceWatcher watcher) {
checkArgument(!watchers.contains(watcher), "watcher %s already registered", watcher);
watchers.add(watcher);
@ -2165,7 +2309,7 @@ final class ClientXdsClient extends AbstractXdsClient {
class ResourceNotFound implements Runnable {
@Override
public void run() {
getLogger().log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout",
logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout",
type, resource);
respTimer = null;
onAbsent();
@ -2179,9 +2323,9 @@ final class ClientXdsClient extends AbstractXdsClient {
// Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED.
metadata = ResourceMetadata.newResourceMetadataRequested();
respTimer = getSyncContext().schedule(
respTimer = syncContext.schedule(
new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS,
getTimeService());
timeService);
}
void stopTimer() {
@ -2216,7 +2360,7 @@ final class ClientXdsClient extends AbstractXdsClient {
if (respTimer != null && respTimer.isPending()) { // too early to conclude absence
return;
}
getLogger().log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
if (!absent) {
data = null;
absent = true;
@ -2324,4 +2468,19 @@ final class ClientXdsClient extends AbstractXdsClient {
return errorDetail;
}
}
abstract static class XdsChannelFactory {
static final XdsChannelFactory DEFAULT_XDS_CHANNEL_FACTORY = new XdsChannelFactory() {
@Override
ManagedChannel create(ServerInfo serverInfo) {
String target = serverInfo.target();
ChannelCredentials channelCredentials = serverInfo.channelCredentials();
return Grpc.newChannelBuilder(target, channelCredentials)
.keepAliveTime(5, TimeUnit.MINUTES)
.build();
}
};
abstract ManagedChannel create(ServerInfo serverInfo);
}
}

View File

@ -28,9 +28,9 @@ import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceStub;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.InternalLogId;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
@ -55,7 +55,7 @@ import javax.annotation.Nullable;
final class LoadReportClient {
private final InternalLogId logId;
private final XdsLogger logger;
private final ManagedChannel channel;
private final Channel channel;
private final Context context;
private final boolean useProtocolV3;
private final Node node;
@ -75,7 +75,7 @@ final class LoadReportClient {
LoadReportClient(
LoadStatsManager2 loadStatsManager,
ManagedChannel channel,
Channel channel,
Context context,
boolean useProtocolV3,
Node node,

View File

@ -19,22 +19,18 @@ package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ChannelCredentials;
import io.grpc.Context;
import io.grpc.Grpc;
import io.grpc.ManagedChannel;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.TimeProvider;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.ClientXdsClient.XdsChannelFactory;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
import io.grpc.xds.internal.sds.TlsContextManagerImpl;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
@ -113,8 +109,6 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
@GuardedBy("lock")
private ScheduledExecutorService scheduler;
@GuardedBy("lock")
private ManagedChannel channel;
@GuardedBy("lock")
private XdsClient xdsClient;
@GuardedBy("lock")
private int refCount;
@ -128,16 +122,16 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
public XdsClient getObject() {
synchronized (lock) {
if (refCount == 0) {
ServerInfo serverInfo = bootstrapInfo.servers().get(0); // use first server
String target = serverInfo.target();
ChannelCredentials channelCredentials = serverInfo.channelCredentials();
channel = Grpc.newChannelBuilder(target, channelCredentials)
.keepAliveTime(5, TimeUnit.MINUTES)
.build();
scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
xdsClient = new ClientXdsClient(channel, bootstrapInfo, context, scheduler,
new ExponentialBackoffPolicy.Provider(), GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER, new TlsContextManagerImpl(bootstrapInfo));
xdsClient = new ClientXdsClient(
XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY,
bootstrapInfo,
context,
scheduler,
new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER,
new TlsContextManagerImpl(bootstrapInfo));
}
refCount++;
return xdsClient;
@ -151,21 +145,12 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
if (refCount == 0) {
xdsClient.shutdown();
xdsClient = null;
channel.shutdown();
scheduler = SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler);
}
return null;
}
}
@VisibleForTesting
@Nullable
ManagedChannel getChannelForTest() {
synchronized (lock) {
return channel;
}
}
@VisibleForTesting
@Nullable
XdsClient getXdsClientForTest() {

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import com.google.protobuf.Any;
import io.grpc.Status;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
import io.grpc.xds.EnvoyServerProtoData.Listener;
@ -31,6 +32,7 @@ import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@ -495,6 +497,7 @@ abstract class XdsClient {
/**
* Returns the latest accepted version of the given resource type.
*/
// TODO(https://github.com/grpc/grpc-java/issues/8629): remove this
String getCurrentVersion(ResourceType type) {
throw new UnsupportedOperationException();
}
@ -566,6 +569,7 @@ abstract class XdsClient {
* use {@link ClusterDropStats#release} to release its <i>hard</i> reference when it is safe to
* stop reporting dropped RPCs for the specified cluster in the future.
*/
// TODO(https://github.com/grpc/grpc-java/issues/8628): add ServerInfo arg
ClusterDropStats addClusterDropStats(String clusterName, @Nullable String edsServiceName) {
throw new UnsupportedOperationException();
}
@ -578,8 +582,48 @@ abstract class XdsClient {
* reference when it is safe to stop reporting RPC loads for the specified locality in the
* future.
*/
// TODO(https://github.com/grpc/grpc-java/issues/8628): add ServerInfo arg
ClusterLocalityStats addClusterLocalityStats(
String clusterName, @Nullable String edsServiceName, Locality locality) {
throw new UnsupportedOperationException();
}
interface XdsResponseHandler {
/** Called when an LDS response is received. */
void handleLdsResponse(
ServerInfo serverInfo, String versionInfo, List<Any> resources, String nonce);
/** Called when an RDS response is received. */
void handleRdsResponse(
ServerInfo serverInfo, String versionInfo, List<Any> resources, String nonce);
/** Called when an CDS response is received. */
void handleCdsResponse(
ServerInfo serverInfo, String versionInfo, List<Any> resources, String nonce);
/** Called when an EDS response is received. */
void handleEdsResponse(
ServerInfo serverInfo, String versionInfo, List<Any> resources, String nonce);
/** Called when the ADS stream is closed passively. */
// Must be synchronized.
void handleStreamClosed(Status error);
/** Called when the ADS stream has been recreated. */
// Must be synchronized.
void handleStreamRestarted(ServerInfo serverInfo);
}
interface ResourceStore {
/**
* Returns the collection of resources currently subscribing to or {@code null} if not
* subscribing to any resources for the given type.
*
* <p>Note an empty collection indicates subscribing to resources of the given type with
* wildcard mode.
*/
// Must be synchronized.
@Nullable
Collection<String> getSubscribedResources(ServerInfo serverInfo, ResourceType type);
}
}

View File

@ -59,6 +59,8 @@ import io.grpc.internal.TimeProvider;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import io.grpc.xds.Bootstrapper.CertificateProviderInfo;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.ClientXdsClient.XdsChannelFactory;
import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.Endpoints.LbEndpoint;
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
@ -272,6 +274,12 @@ public abstract class ClientXdsClientTestBase {
.start());
channel =
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
XdsChannelFactory xdsChannelFactory = new XdsChannelFactory() {
@Override
ManagedChannel create(ServerInfo serverInfo) {
return channel;
}
};
Bootstrapper.BootstrapInfo bootstrapInfo =
Bootstrapper.BootstrapInfo.builder()
@ -284,7 +292,7 @@ public abstract class ClientXdsClientTestBase {
.build();
xdsClient =
new ClientXdsClient(
channel,
xdsChannelFactory,
bootstrapInfo,
Context.ROOT,
fakeClock.getScheduledExecutorService(),
@ -2325,6 +2333,7 @@ public abstract class ClientXdsClientTestBase {
@Test
public void reportLoadStatsToServer() {
xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher);
String clusterName = "cluster-foo.googleapis.com";
ClusterDropStats dropStats = xdsClient.addClusterDropStats(clusterName, null);
LrsRpcCall lrsCall = loadReportCalls.poll();

View File

@ -23,7 +23,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.Bootstrapper.ServerInfo;
@ -90,7 +89,6 @@ public class SharedXdsClientPoolProviderTest {
BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build();
RefCountedXdsClientObjectPool xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo);
assertThat(xdsClientPool.getXdsClientForTest()).isNull();
assertThat(xdsClientPool.getChannelForTest()).isNull();
XdsClient xdsClient = xdsClientPool.getObject();
assertThat(xdsClientPool.getXdsClientForTest()).isNotNull();
xdsClientPool.returnObject(xdsClient);
@ -113,7 +111,6 @@ public class SharedXdsClientPoolProviderTest {
// returnObject twice
assertThat(xdsClientPool.returnObject(xdsClient)).isNull();
assertThat(xdsClient.isShutDown()).isTrue();
assertThat(xdsClientPool.getChannelForTest().isShutdown()).isTrue();
}
@Test
@ -123,14 +120,11 @@ public class SharedXdsClientPoolProviderTest {
BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build();
RefCountedXdsClientObjectPool xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo);
XdsClient xdsClient1 = xdsClientPool.getObject();
ManagedChannel channel1 = xdsClientPool.getChannelForTest();
assertThat(xdsClientPool.returnObject(xdsClient1)).isNull();
assertThat(xdsClient1.isShutDown()).isTrue();
assertThat(channel1.isShutdown()).isTrue();
XdsClient xdsClient2 = xdsClientPool.getObject();
assertThat(xdsClient2).isNotSameInstanceAs(xdsClient1);
assertThat(xdsClientPool.getChannelForTest()).isNotSameInstanceAs(channel1);
xdsClientPool.returnObject(xdsClient2);
}
}