diff --git a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java index 0e609ff745..e29949eb8e 100644 --- a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java @@ -28,6 +28,7 @@ import com.google.rpc.Code; import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.grpc.Channel; import io.grpc.Context; import io.grpc.InternalLogId; import io.grpc.ManagedChannel; @@ -36,6 +37,11 @@ import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; import io.grpc.stub.StreamObserver; +import io.grpc.xds.Bootstrapper.ServerInfo; +import io.grpc.xds.ClientXdsClient.XdsChannelFactory; +import io.grpc.xds.EnvoyProtoData.Node; +import io.grpc.xds.XdsClient.ResourceStore; +import io.grpc.xds.XdsClient.XdsResponseHandler; import io.grpc.xds.XdsLogger.XdsLogLevel; import java.util.Collection; import java.util.Collections; @@ -48,7 +54,7 @@ import javax.annotation.Nullable; * Common base type for XdsClient implementations, which encapsulates the layer abstraction of * the xDS RPC stream. */ -abstract class AbstractXdsClient extends XdsClient { +final class AbstractXdsClient { private static final String ADS_TYPE_URL_LDS_V2 = "type.googleapis.com/envoy.api.v2.Listener"; private static final String ADS_TYPE_URL_LDS = @@ -66,26 +72,18 @@ abstract class AbstractXdsClient extends XdsClient { private static final String ADS_TYPE_URL_EDS = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"; - private final SynchronizationContext syncContext = new SynchronizationContext( - new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - getLogger().log( - XdsLogLevel.ERROR, - "Uncaught exception in XdsClient SynchronizationContext. Panic!", - e); - // TODO(chengyuanzhang): better error handling. - throw new AssertionError(e); - } - }); + private final SynchronizationContext syncContext; private final InternalLogId logId; private final XdsLogger logger; + private final ServerInfo serverInfo; private final ManagedChannel channel; + private final XdsResponseHandler xdsResponseHandler; + private final ResourceStore resourceStore; private final Context context; private final ScheduledExecutorService timeService; private final BackoffPolicy.Provider backoffPolicyProvider; private final Stopwatch stopwatch; - private final Bootstrapper.BootstrapInfo bootstrapInfo; + private final Node bootstrapNode; // Last successfully applied version_info for each resource type. Starts with empty string. // A version_info is used to update management server with client's most recent knowledge of @@ -103,71 +101,42 @@ abstract class AbstractXdsClient extends XdsClient { @Nullable private ScheduledHandle rpcRetryTimer; - AbstractXdsClient(ManagedChannel channel, Bootstrapper.BootstrapInfo bootstrapInfo, - Context context, ScheduledExecutorService timeService, - BackoffPolicy.Provider backoffPolicyProvider, Supplier stopwatchSupplier) { - this.channel = checkNotNull(channel, "channel"); - this.bootstrapInfo = checkNotNull(bootstrapInfo, "bootstrapInfo"); + /** An entity that manages ADS RPCs over a single channel. */ + // TODO: rename to XdsChannel + AbstractXdsClient( + XdsChannelFactory xdsChannelFactory, + ServerInfo serverInfo, + Node bootstrapNode, + XdsResponseHandler xdsResponseHandler, + ResourceStore resourceStore, + Context context, + ScheduledExecutorService + timeService, + SynchronizationContext syncContext, + BackoffPolicy.Provider backoffPolicyProvider, + Supplier stopwatchSupplier) { + this.serverInfo = checkNotNull(serverInfo, "serverInfo"); + this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo); + this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler"); + this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber"); + this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode"); this.context = checkNotNull(context, "context"); this.timeService = checkNotNull(timeService, "timeService"); + this.syncContext = checkNotNull(syncContext, "syncContext"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get(); - logId = InternalLogId.allocate("xds-client", null); + logId = InternalLogId.allocate("xds-client", serverInfo.target()); logger = XdsLogger.withLogId(logId); logger.log(XdsLogLevel.INFO, "Created"); } - /** - * Called when an LDS response is received. - */ - // Must be synchronized. - protected void handleLdsResponse(String versionInfo, List resources, String nonce) { + /** The underlying channel. */ + // Currently, only externally used for LrsClient. + Channel channel() { + return channel; } - /** - * Called when a RDS response is received. - */ - // Must be synchronized. - protected void handleRdsResponse(String versionInfo, List resources, String nonce) { - } - - /** - * Called when a CDS response is received. - */ - // Must be synchronized. - protected void handleCdsResponse(String versionInfo, List resources, String nonce) { - } - - /** - * Called when an EDS response is received. - */ - // Must be synchronized. - protected void handleEdsResponse(String versionInfo, List resources, String nonce) { - } - - /** - * Called when the ADS stream is closed passively. - */ - // Must be synchronized. - protected void handleStreamClosed(Status error) { - } - - /** - * Called when the ADS stream has been recreated. - */ - // Must be synchronized. - protected void handleStreamRestarted() { - } - - /** - * Called when being shut down. - */ - // Must be synchronized. - protected void handleShutdown() { - } - - @Override - final void shutdown() { + void shutdown() { syncContext.execute(new Runnable() { @Override public void run() { @@ -179,49 +148,28 @@ abstract class AbstractXdsClient extends XdsClient { if (rpcRetryTimer != null && rpcRetryTimer.isPending()) { rpcRetryTimer.cancel(); } - handleShutdown(); + channel.shutdown(); } }); } - @Override - boolean isShutDown() { - return shutdown; - } - - @Override - Bootstrapper.BootstrapInfo getBootstrapInfo() { - return bootstrapInfo; - } - @Override public String toString() { return logId.toString(); } - /** - * Returns the collection of resources currently subscribing to or {@code null} if not - * subscribing to any resources for the given type. - * - *

Note an empty collection indicates subscribing to resources of the given type with - * wildcard mode. - */ - // Must be synchronized. - @Nullable - abstract Collection getSubscribedResources(ResourceType type); - /** * Updates the resource subscription for the given resource type. */ // Must be synchronized. - protected final void adjustResourceSubscription(ResourceType type) { + void adjustResourceSubscription(ResourceType type) { if (isInBackoff()) { return; } if (adsStream == null) { startRpcStream(); } - Collection resources = getSubscribedResources(type); + Collection resources = resourceStore.getSubscribedResources(serverInfo, type); if (resources != null) { adsStream.sendDiscoveryRequest(type, resources); } @@ -232,7 +180,7 @@ abstract class AbstractXdsClient extends XdsClient { * and sends an ACK request to the management server. */ // Must be synchronized. - protected final void ackResponse(ResourceType type, String versionInfo, String nonce) { + void ackResponse(ResourceType type, String versionInfo, String nonce) { switch (type) { case LDS: ldsVersion = versionInfo; @@ -252,7 +200,7 @@ abstract class AbstractXdsClient extends XdsClient { } logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}", type, nonce, versionInfo); - Collection resources = getSubscribedResources(type); + Collection resources = resourceStore.getSubscribedResources(serverInfo, type); if (resources == null) { resources = Collections.emptyList(); } @@ -264,34 +212,22 @@ abstract class AbstractXdsClient extends XdsClient { * accepted version) to the management server. */ // Must be synchronized. - protected final void nackResponse(ResourceType type, String nonce, String errorDetail) { + void nackResponse(ResourceType type, String nonce, String errorDetail) { String versionInfo = getCurrentVersion(type); logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}", type, nonce, versionInfo); - Collection resources = getSubscribedResources(type); + Collection resources = resourceStore.getSubscribedResources(serverInfo, type); if (resources == null) { resources = Collections.emptyList(); } adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail); } - protected final SynchronizationContext getSyncContext() { - return syncContext; - } - - protected final ScheduledExecutorService getTimeService() { - return timeService; - } - - protected final XdsLogger getLogger() { - return logger; - } - /** * Returns {@code true} if the resource discovery is currently in backoff. */ // Must be synchronized. - protected final boolean isInBackoff() { + boolean isInBackoff() { return rpcRetryTimer != null && rpcRetryTimer.isPending(); } @@ -302,7 +238,7 @@ abstract class AbstractXdsClient extends XdsClient { // Must be synchronized. private void startRpcStream() { checkState(adsStream == null, "Previous adsStream has not been cleared yet"); - if (bootstrapInfo.servers().get(0).useProtocolV3()) { + if (serverInfo.useProtocolV3()) { adsStream = new AdsStreamV3(); } else { adsStream = new AdsStreamV2(); @@ -317,8 +253,8 @@ abstract class AbstractXdsClient extends XdsClient { stopwatch.reset().start(); } + /** Returns the latest accepted version of the given resource type. */ // Must be synchronized. - @Override String getCurrentVersion(ResourceType type) { String version; switch (type) { @@ -353,16 +289,16 @@ abstract class AbstractXdsClient extends XdsClient { if (type == ResourceType.UNKNOWN) { continue; } - Collection resources = getSubscribedResources(type); + Collection resources = resourceStore.getSubscribedResources(serverInfo, type); if (resources != null) { adsStream.sendDiscoveryRequest(type, resources); } } - handleStreamRestarted(); + xdsResponseHandler.handleStreamRestarted(serverInfo); } } - protected enum ResourceType { + enum ResourceType { UNKNOWN, LDS, RDS, CDS, EDS; String typeUrl() { @@ -488,19 +424,19 @@ abstract class AbstractXdsClient extends XdsClient { switch (type) { case LDS: ldsRespNonce = nonce; - handleLdsResponse(versionInfo, resources, nonce); + xdsResponseHandler.handleLdsResponse(serverInfo, versionInfo, resources, nonce); break; case RDS: rdsRespNonce = nonce; - handleRdsResponse(versionInfo, resources, nonce); + xdsResponseHandler.handleRdsResponse(serverInfo, versionInfo, resources, nonce); break; case CDS: cdsRespNonce = nonce; - handleCdsResponse(versionInfo, resources, nonce); + xdsResponseHandler.handleCdsResponse(serverInfo, versionInfo, resources, nonce); break; case EDS: edsRespNonce = nonce; - handleEdsResponse(versionInfo, resources, nonce); + xdsResponseHandler.handleEdsResponse(serverInfo, versionInfo, resources, nonce); break; case UNKNOWN: default: @@ -526,7 +462,7 @@ abstract class AbstractXdsClient extends XdsClient { "ADS stream closed with status {0}: {1}. Cause: {2}", error.getCode(), error.getDescription(), error.getCause()); closed = true; - handleStreamClosed(error); + xdsResponseHandler.handleStreamClosed(error); cleanUp(); if (responseReceived || retryBackoffPolicy == null) { // Reset the backoff sequence if had received a response, or backoff sequence @@ -619,7 +555,7 @@ abstract class AbstractXdsClient extends XdsClient { io.envoyproxy.envoy.api.v2.DiscoveryRequest.Builder builder = io.envoyproxy.envoy.api.v2.DiscoveryRequest.newBuilder() .setVersionInfo(versionInfo) - .setNode(bootstrapInfo.node().toEnvoyProtoNodeV2()) + .setNode(bootstrapNode.toEnvoyProtoNodeV2()) .addAllResourceNames(resources) .setTypeUrl(type.typeUrlV2()) .setResponseNonce(nonce); @@ -699,7 +635,7 @@ abstract class AbstractXdsClient extends XdsClient { DiscoveryRequest.Builder builder = DiscoveryRequest.newBuilder() .setVersionInfo(versionInfo) - .setNode(bootstrapInfo.node().toEnvoyProtoNode()) + .setNode(bootstrapNode.toEnvoyProtoNode()) .addAllResourceNames(resources) .setTypeUrl(type.typeUrl()) .setResponseNonce(nonce); diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index 2a4405f45f..92b1cfd3f7 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -27,6 +27,7 @@ import com.google.common.base.Stopwatch; import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.protobuf.Any; import com.google.protobuf.Duration; import com.google.protobuf.InvalidProtocolBufferException; @@ -56,14 +57,20 @@ import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext; import io.envoyproxy.envoy.type.v3.FractionalPercent; import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType; +import io.grpc.ChannelCredentials; import io.grpc.Context; import io.grpc.EquivalentAddressGroup; +import io.grpc.Grpc; +import io.grpc.InternalLogId; import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.Status.Code; +import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; import io.grpc.internal.TimeProvider; +import io.grpc.xds.AbstractXdsClient.ResourceType; +import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.Endpoints.LbEndpoint; import io.grpc.xds.Endpoints.LocalityLbEndpoints; @@ -85,6 +92,8 @@ import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy; import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy; import io.grpc.xds.VirtualHost.Route.RouteMatch; import io.grpc.xds.VirtualHost.Route.RouteMatch.PathMatcher; +import io.grpc.xds.XdsClient.ResourceStore; +import io.grpc.xds.XdsClient.XdsResponseHandler; import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.internal.Matchers.FractionMatcher; import io.grpc.xds.internal.Matchers.HeaderMatcher; @@ -111,7 +120,7 @@ import javax.annotation.Nullable; /** * XdsClient implementation for client side usages. */ -final class ClientXdsClient extends AbstractXdsClient { +final class ClientXdsClient extends XdsClient implements XdsResponseHandler, ResourceStore { // Longest time to wait, since the subscription to some resource, for concluding its absence. @VisibleForTesting @@ -164,33 +173,90 @@ final class ClientXdsClient extends AbstractXdsClient { Code.CANCELLED, Code.DEADLINE_EXCEEDED, Code.INTERNAL, Code.RESOURCE_EXHAUSTED, Code.UNAVAILABLE)); + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + logger.log( + XdsLogLevel.ERROR, + "Uncaught exception in XdsClient SynchronizationContext. Panic!", + e); + // TODO(chengyuanzhang): better error handling. + throw new AssertionError(e); + } + }); private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry(); + private final Map serverChannelMap = new HashMap<>(); private final Map ldsResourceSubscribers = new HashMap<>(); private final Map rdsResourceSubscribers = new HashMap<>(); private final Map cdsResourceSubscribers = new HashMap<>(); private final Map edsResourceSubscribers = new HashMap<>(); private final LoadStatsManager2 loadStatsManager; - private final LoadReportClient lrsClient; + private final Map serverLrsClientMap = new HashMap<>(); + private final XdsChannelFactory xdsChannelFactory; + private final Bootstrapper.BootstrapInfo bootstrapInfo; + private final Context context; + private final ScheduledExecutorService timeService; + private final BackoffPolicy.Provider backoffPolicyProvider; + private final Supplier stopwatchSupplier; private final TimeProvider timeProvider; private boolean reportingLoad; private final TlsContextManager tlsContextManager; + private final InternalLogId logId; + private final XdsLogger logger; + private volatile boolean isShutdown; + // TODO(zdapeng): rename to XdsClientImpl ClientXdsClient( - ManagedChannel channel, Bootstrapper.BootstrapInfo bootstrapInfo, Context context, - ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider, - Supplier stopwatchSupplier, TimeProvider timeProvider, + XdsChannelFactory xdsChannelFactory, + Bootstrapper.BootstrapInfo bootstrapInfo, + Context context, + ScheduledExecutorService timeService, + BackoffPolicy.Provider backoffPolicyProvider, + Supplier stopwatchSupplier, + TimeProvider timeProvider, TlsContextManager tlsContextManager) { - super(channel, bootstrapInfo, context, timeService, backoffPolicyProvider, stopwatchSupplier); + this.xdsChannelFactory = xdsChannelFactory; + this.bootstrapInfo = bootstrapInfo; + this.context = context; + this.timeService = timeService; loadStatsManager = new LoadStatsManager2(stopwatchSupplier); + this.backoffPolicyProvider = backoffPolicyProvider; + this.stopwatchSupplier = stopwatchSupplier; this.timeProvider = timeProvider; this.tlsContextManager = checkNotNull(tlsContextManager, "tlsContextManager"); - lrsClient = new LoadReportClient(loadStatsManager, channel, context, - bootstrapInfo.servers().get(0).useProtocolV3(), bootstrapInfo.node(), - getSyncContext(), timeService, backoffPolicyProvider, stopwatchSupplier); + logId = InternalLogId.allocate("xds-client", null); + logger = XdsLogger.withLogId(logId); + logger.log(XdsLogLevel.INFO, "Created"); + } + + private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) { + syncContext.throwIfNotInThisSynchronizationContext(); + if (serverChannelMap.containsKey(serverInfo)) { + return; + } + AbstractXdsClient xdsChannel = new AbstractXdsClient( + xdsChannelFactory, + serverInfo, + bootstrapInfo.node(), + this, + this, + context, + timeService, + syncContext, + backoffPolicyProvider, + stopwatchSupplier); + LoadReportClient lrsClient = new LoadReportClient( + loadStatsManager, xdsChannel.channel(), context, serverInfo.useProtocolV3(), + bootstrapInfo.node(), syncContext, timeService, backoffPolicyProvider, stopwatchSupplier); + serverChannelMap.put(serverInfo, xdsChannel); + serverLrsClientMap.put(serverInfo, lrsClient); } @Override - protected void handleLdsResponse(String versionInfo, List resources, String nonce) { + public void handleLdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce) { + syncContext.throwIfNotInThisSynchronizationContext(); Map parsedResources = new HashMap<>(resources.size()); Set unpackedResources = new HashSet<>(resources.size()); Set invalidResources = new HashSet<>(); @@ -233,12 +299,12 @@ final class ClientXdsClient extends AbstractXdsClient { // LdsUpdate parsed successfully. parsedResources.put(listenerName, new ParsedResource(ldsUpdate, resource)); } - getLogger().log(XdsLogLevel.INFO, + logger.log(XdsLogLevel.INFO, "Received LDS Response version {0} nonce {1}. Parsed resources: {2}", versionInfo, nonce, unpackedResources); handleResourceUpdate( - ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources, versionInfo, - nonce, errors); + serverInfo, ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources, + versionInfo, nonce, errors); } private LdsUpdate processClientSideListener( @@ -1307,7 +1373,9 @@ final class ClientXdsClient extends AbstractXdsClient { } @Override - protected void handleRdsResponse(String versionInfo, List resources, String nonce) { + public void handleRdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce) { + syncContext.throwIfNotInThisSynchronizationContext(); Map parsedResources = new HashMap<>(resources.size()); Set unpackedResources = new HashSet<>(resources.size()); Set invalidResources = new HashSet<>(); @@ -1344,12 +1412,12 @@ final class ClientXdsClient extends AbstractXdsClient { parsedResources.put(routeConfigName, new ParsedResource(rdsUpdate, resource)); } - getLogger().log(XdsLogLevel.INFO, + logger.log(XdsLogLevel.INFO, "Received RDS Response version {0} nonce {1}. Parsed resources: {2}", versionInfo, nonce, unpackedResources); handleResourceUpdate( - ResourceType.RDS, parsedResources, invalidResources, Collections.emptySet(), - versionInfo, nonce, errors); + serverInfo, ResourceType.RDS, parsedResources, invalidResources, + Collections.emptySet(), versionInfo, nonce, errors); } private static RdsUpdate processRouteConfiguration( @@ -1370,7 +1438,9 @@ final class ClientXdsClient extends AbstractXdsClient { } @Override - protected void handleCdsResponse(String versionInfo, List resources, String nonce) { + public void handleCdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce) { + syncContext.throwIfNotInThisSynchronizationContext(); Map parsedResources = new HashMap<>(resources.size()); Set unpackedResources = new HashSet<>(resources.size()); Set invalidResources = new HashSet<>(); @@ -1415,12 +1485,12 @@ final class ClientXdsClient extends AbstractXdsClient { } parsedResources.put(clusterName, new ParsedResource(cdsUpdate, resource)); } - getLogger().log(XdsLogLevel.INFO, + logger.log(XdsLogLevel.INFO, "Received CDS Response version {0} nonce {1}. Parsed resources: {2}", versionInfo, nonce, unpackedResources); handleResourceUpdate( - ResourceType.CDS, parsedResources, invalidResources, retainedEdsResources, versionInfo, - nonce, errors); + serverInfo, ResourceType.CDS, parsedResources, invalidResources, retainedEdsResources, + versionInfo, nonce, errors); } @VisibleForTesting @@ -1598,7 +1668,9 @@ final class ClientXdsClient extends AbstractXdsClient { } @Override - protected void handleEdsResponse(String versionInfo, List resources, String nonce) { + public void handleEdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce) { + syncContext.throwIfNotInThisSynchronizationContext(); Map parsedResources = new HashMap<>(resources.size()); Set unpackedResources = new HashSet<>(resources.size()); Set invalidResources = new HashSet<>(); @@ -1641,12 +1713,12 @@ final class ClientXdsClient extends AbstractXdsClient { } parsedResources.put(clusterName, new ParsedResource(edsUpdate, resource)); } - getLogger().log( + logger.log( XdsLogLevel.INFO, "Received EDS Response version {0} nonce {1}. Parsed resources: {2}", versionInfo, nonce, unpackedResources); handleResourceUpdate( - ResourceType.EDS, parsedResources, invalidResources, Collections.emptySet(), - versionInfo, nonce, errors); + serverInfo, ResourceType.EDS, parsedResources, invalidResources, + Collections.emptySet(), versionInfo, nonce, errors); } private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment) @@ -1775,7 +1847,8 @@ final class ClientXdsClient extends AbstractXdsClient { } @Override - protected void handleStreamClosed(Status error) { + public void handleStreamClosed(Status error) { + syncContext.throwIfNotInThisSynchronizationContext(); cleanUpResourceTimers(); for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) { subscriber.onError(error); @@ -1792,27 +1865,56 @@ final class ClientXdsClient extends AbstractXdsClient { } @Override - protected void handleStreamRestarted() { + public void handleStreamRestarted(ServerInfo serverInfo) { + syncContext.throwIfNotInThisSynchronizationContext(); for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) { - subscriber.restartTimer(); + if (subscriber.serverInfo.equals(serverInfo)) { + subscriber.restartTimer(); + } } for (ResourceSubscriber subscriber : rdsResourceSubscribers.values()) { - subscriber.restartTimer(); + if (subscriber.serverInfo.equals(serverInfo)) { + subscriber.restartTimer(); + } } for (ResourceSubscriber subscriber : cdsResourceSubscribers.values()) { - subscriber.restartTimer(); + if (subscriber.serverInfo.equals(serverInfo)) { + subscriber.restartTimer(); + } } for (ResourceSubscriber subscriber : edsResourceSubscribers.values()) { - subscriber.restartTimer(); + if (subscriber.serverInfo.equals(serverInfo)) { + subscriber.restartTimer(); + } } } @Override - protected void handleShutdown() { - if (reportingLoad) { - lrsClient.stopLoadReporting(); - } - cleanUpResourceTimers(); + void shutdown() { + syncContext.execute( + new Runnable() { + @Override + public void run() { + if (isShutdown) { + return; + } + isShutdown = true; + for (AbstractXdsClient xdsChannel : serverChannelMap.values()) { + xdsChannel.shutdown(); + } + if (reportingLoad) { + for (final LoadReportClient lrsClient : serverLrsClientMap.values()) { + lrsClient.stopLoadReporting(); + } + } + cleanUpResourceTimers(); + } + }); + } + + @Override + boolean isShutDown() { + return isShutdown; } private Map getSubscribedResourcesMap(ResourceType type) { @@ -1833,9 +1935,16 @@ final class ClientXdsClient extends AbstractXdsClient { @Nullable @Override - Collection getSubscribedResources(ResourceType type) { + public Collection getSubscribedResources(ServerInfo serverInfo, ResourceType type) { Map resources = getSubscribedResourcesMap(type); - return resources.isEmpty() ? null : resources.keySet(); + ImmutableSet.Builder builder = ImmutableSet.builder(); + for (String key : resources.keySet()) { + if (resources.get(key).serverInfo.equals(serverInfo)) { + builder.add(key); + } + } + Collection retVal = builder.build(); + return retVal.isEmpty() ? null : retVal; } @Override @@ -1854,15 +1963,15 @@ final class ClientXdsClient extends AbstractXdsClient { @Override void watchLdsResource(final String resourceName, final LdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName); if (subscriber == null) { - getLogger().log(XdsLogLevel.INFO, "Subscribe LDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Subscribe LDS resource {0}", resourceName); subscriber = new ResourceSubscriber(ResourceType.LDS, resourceName); ldsResourceSubscribers.put(resourceName, subscriber); - adjustResourceSubscription(ResourceType.LDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.LDS); } subscriber.addWatcher(watcher); } @@ -1871,16 +1980,16 @@ final class ClientXdsClient extends AbstractXdsClient { @Override void cancelLdsResourceWatch(final String resourceName, final LdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { subscriber.stopTimer(); - getLogger().log(XdsLogLevel.INFO, "Unsubscribe LDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Unsubscribe LDS resource {0}", resourceName); ldsResourceSubscribers.remove(resourceName); - adjustResourceSubscription(ResourceType.LDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.LDS); } } }); @@ -1888,15 +1997,15 @@ final class ClientXdsClient extends AbstractXdsClient { @Override void watchRdsResource(final String resourceName, final RdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName); if (subscriber == null) { - getLogger().log(XdsLogLevel.INFO, "Subscribe RDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Subscribe RDS resource {0}", resourceName); subscriber = new ResourceSubscriber(ResourceType.RDS, resourceName); rdsResourceSubscribers.put(resourceName, subscriber); - adjustResourceSubscription(ResourceType.RDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.RDS); } subscriber.addWatcher(watcher); } @@ -1905,16 +2014,16 @@ final class ClientXdsClient extends AbstractXdsClient { @Override void cancelRdsResourceWatch(final String resourceName, final RdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { subscriber.stopTimer(); - getLogger().log(XdsLogLevel.INFO, "Unsubscribe RDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Unsubscribe RDS resource {0}", resourceName); rdsResourceSubscribers.remove(resourceName); - adjustResourceSubscription(ResourceType.RDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.RDS); } } }); @@ -1922,15 +2031,15 @@ final class ClientXdsClient extends AbstractXdsClient { @Override void watchCdsResource(final String resourceName, final CdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName); if (subscriber == null) { - getLogger().log(XdsLogLevel.INFO, "Subscribe CDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Subscribe CDS resource {0}", resourceName); subscriber = new ResourceSubscriber(ResourceType.CDS, resourceName); cdsResourceSubscribers.put(resourceName, subscriber); - adjustResourceSubscription(ResourceType.CDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.CDS); } subscriber.addWatcher(watcher); } @@ -1939,16 +2048,16 @@ final class ClientXdsClient extends AbstractXdsClient { @Override void cancelCdsResourceWatch(final String resourceName, final CdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { subscriber.stopTimer(); - getLogger().log(XdsLogLevel.INFO, "Unsubscribe CDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Unsubscribe CDS resource {0}", resourceName); cdsResourceSubscribers.remove(resourceName); - adjustResourceSubscription(ResourceType.CDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.CDS); } } }); @@ -1956,15 +2065,15 @@ final class ClientXdsClient extends AbstractXdsClient { @Override void watchEdsResource(final String resourceName, final EdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName); if (subscriber == null) { - getLogger().log(XdsLogLevel.INFO, "Subscribe EDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Subscribe EDS resource {0}", resourceName); subscriber = new ResourceSubscriber(ResourceType.EDS, resourceName); edsResourceSubscribers.put(resourceName, subscriber); - adjustResourceSubscription(ResourceType.EDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.EDS); } subscriber.addWatcher(watcher); } @@ -1973,30 +2082,32 @@ final class ClientXdsClient extends AbstractXdsClient { @Override void cancelEdsResourceWatch(final String resourceName, final EdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { subscriber.stopTimer(); - getLogger().log(XdsLogLevel.INFO, "Unsubscribe EDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Unsubscribe EDS resource {0}", resourceName); edsResourceSubscribers.remove(resourceName); - adjustResourceSubscription(ResourceType.EDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.EDS); } } }); } @Override - ClusterDropStats addClusterDropStats(String clusterName, @Nullable String edsServiceName) { + ClusterDropStats addClusterDropStats( + String clusterName, @Nullable String edsServiceName) { ClusterDropStats dropCounter = loadStatsManager.getClusterDropStats(clusterName, edsServiceName); - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { if (!reportingLoad) { - lrsClient.startLoadReporting(); + // TODO(https://github.com/grpc/grpc-java/issues/8628): consume ServerInfo arg. + serverLrsClientMap.values().iterator().next().startLoadReporting(); reportingLoad = true; } } @@ -2005,15 +2116,17 @@ final class ClientXdsClient extends AbstractXdsClient { } @Override - ClusterLocalityStats addClusterLocalityStats(String clusterName, - @Nullable String edsServiceName, Locality locality) { + ClusterLocalityStats addClusterLocalityStats( + String clusterName, @Nullable String edsServiceName, + Locality locality) { ClusterLocalityStats loadCounter = loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality); - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { if (!reportingLoad) { - lrsClient.startLoadReporting(); + // TODO(https://github.com/grpc/grpc-java/issues/8628): consume ServerInfo arg. + serverLrsClientMap.values().iterator().next().startLoadReporting(); reportingLoad = true; } } @@ -2021,6 +2134,25 @@ final class ClientXdsClient extends AbstractXdsClient { return loadCounter; } + @Override + Bootstrapper.BootstrapInfo getBootstrapInfo() { + return bootstrapInfo; + } + + // TODO(https://github.com/grpc/grpc-java/issues/8629): remove this + @Override + String getCurrentVersion(ResourceType type) { + if (serverChannelMap.isEmpty()) { + return ""; + } + return serverChannelMap.values().iterator().next().getCurrentVersion(type); + } + + @Override + public String toString() { + return logId.toString(); + } + private void cleanUpResourceTimers() { for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) { subscriber.stopTimer(); @@ -2037,18 +2169,19 @@ final class ClientXdsClient extends AbstractXdsClient { } private void handleResourceUpdate( - ResourceType type, Map parsedResources, Set invalidResources, - Set retainedResources, String version, String nonce, List errors) { + ServerInfo serverInfo, ResourceType type, Map parsedResources, + Set invalidResources, Set retainedResources, String version, String nonce, + List errors) { String errorDetail = null; if (errors.isEmpty()) { checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors"); - ackResponse(type, version, nonce); + serverChannelMap.get(serverInfo).ackResponse(type, version, nonce); } else { errorDetail = Joiner.on('\n').join(errors); - getLogger().log(XdsLogLevel.WARNING, + logger.log(XdsLogLevel.WARNING, "Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}", type, version, nonce, errorDetail); - nackResponse(type, nonce, errorDetail); + serverChannelMap.get(serverInfo).nackResponse(type, nonce, errorDetail); } long updateTime = timeProvider.currentTimeNanos(); for (Map.Entry entry : getSubscribedResourcesMap(type).entrySet()) { @@ -2123,6 +2256,8 @@ final class ClientXdsClient extends AbstractXdsClient { * Tracks a single subscribed resource. */ private final class ResourceSubscriber { + private final ServerInfo serverInfo; + private final AbstractXdsClient xdsChannel; private final ResourceType type; private final String resource; private final Set watchers = new HashSet<>(); @@ -2132,17 +2267,26 @@ final class ClientXdsClient extends AbstractXdsClient { private ResourceMetadata metadata; ResourceSubscriber(ResourceType type, String resource) { + syncContext.throwIfNotInThisSynchronizationContext(); this.type = type; this.resource = resource; + this.serverInfo = getServerInfo(); // Initialize metadata in UNKNOWN state to cover the case when resource subscriber, // is created but not yet requested because the client is in backoff. this.metadata = ResourceMetadata.newResourceMetadataUnknown(); - if (isInBackoff()) { + maybeCreateXdsChannelWithLrs(serverInfo); + this.xdsChannel = serverChannelMap.get(serverInfo); + if (xdsChannel.isInBackoff()) { return; } restartTimer(); } + // TODO(zdapeng): add resourceName arg and support xdstp:// resources + private ServerInfo getServerInfo() { + return bootstrapInfo.servers().get(0); // use first server + } + void addWatcher(ResourceWatcher watcher) { checkArgument(!watchers.contains(watcher), "watcher %s already registered", watcher); watchers.add(watcher); @@ -2165,7 +2309,7 @@ final class ClientXdsClient extends AbstractXdsClient { class ResourceNotFound implements Runnable { @Override public void run() { - getLogger().log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout", + logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout", type, resource); respTimer = null; onAbsent(); @@ -2179,9 +2323,9 @@ final class ClientXdsClient extends AbstractXdsClient { // Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED. metadata = ResourceMetadata.newResourceMetadataRequested(); - respTimer = getSyncContext().schedule( + respTimer = syncContext.schedule( new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, - getTimeService()); + timeService); } void stopTimer() { @@ -2216,7 +2360,7 @@ final class ClientXdsClient extends AbstractXdsClient { if (respTimer != null && respTimer.isPending()) { // too early to conclude absence return; } - getLogger().log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource); + logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource); if (!absent) { data = null; absent = true; @@ -2324,4 +2468,19 @@ final class ClientXdsClient extends AbstractXdsClient { return errorDetail; } } + + abstract static class XdsChannelFactory { + static final XdsChannelFactory DEFAULT_XDS_CHANNEL_FACTORY = new XdsChannelFactory() { + @Override + ManagedChannel create(ServerInfo serverInfo) { + String target = serverInfo.target(); + ChannelCredentials channelCredentials = serverInfo.channelCredentials(); + return Grpc.newChannelBuilder(target, channelCredentials) + .keepAliveTime(5, TimeUnit.MINUTES) + .build(); + } + }; + + abstract ManagedChannel create(ServerInfo serverInfo); + } } diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java index 54fa20128b..af2a673e9f 100644 --- a/xds/src/main/java/io/grpc/xds/LoadReportClient.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java @@ -28,9 +28,9 @@ import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc; import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceStub; import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; +import io.grpc.Channel; import io.grpc.Context; import io.grpc.InternalLogId; -import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; @@ -55,7 +55,7 @@ import javax.annotation.Nullable; final class LoadReportClient { private final InternalLogId logId; private final XdsLogger logger; - private final ManagedChannel channel; + private final Channel channel; private final Context context; private final boolean useProtocolV3; private final Node node; @@ -75,7 +75,7 @@ final class LoadReportClient { LoadReportClient( LoadStatsManager2 loadStatsManager, - ManagedChannel channel, + Channel channel, Context context, boolean useProtocolV3, Node node, diff --git a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java index 14bdced5da..1c8fe0bad6 100644 --- a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java +++ b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java @@ -19,22 +19,18 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; -import io.grpc.ChannelCredentials; import io.grpc.Context; -import io.grpc.Grpc; -import io.grpc.ManagedChannel; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.GrpcUtil; import io.grpc.internal.ObjectPool; import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.TimeProvider; import io.grpc.xds.Bootstrapper.BootstrapInfo; -import io.grpc.xds.Bootstrapper.ServerInfo; +import io.grpc.xds.ClientXdsClient.XdsChannelFactory; import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory; import io.grpc.xds.internal.sds.TlsContextManagerImpl; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -113,8 +109,6 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory { @GuardedBy("lock") private ScheduledExecutorService scheduler; @GuardedBy("lock") - private ManagedChannel channel; - @GuardedBy("lock") private XdsClient xdsClient; @GuardedBy("lock") private int refCount; @@ -128,16 +122,16 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory { public XdsClient getObject() { synchronized (lock) { if (refCount == 0) { - ServerInfo serverInfo = bootstrapInfo.servers().get(0); // use first server - String target = serverInfo.target(); - ChannelCredentials channelCredentials = serverInfo.channelCredentials(); - channel = Grpc.newChannelBuilder(target, channelCredentials) - .keepAliveTime(5, TimeUnit.MINUTES) - .build(); scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE); - xdsClient = new ClientXdsClient(channel, bootstrapInfo, context, scheduler, - new ExponentialBackoffPolicy.Provider(), GrpcUtil.STOPWATCH_SUPPLIER, - TimeProvider.SYSTEM_TIME_PROVIDER, new TlsContextManagerImpl(bootstrapInfo)); + xdsClient = new ClientXdsClient( + XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY, + bootstrapInfo, + context, + scheduler, + new ExponentialBackoffPolicy.Provider(), + GrpcUtil.STOPWATCH_SUPPLIER, + TimeProvider.SYSTEM_TIME_PROVIDER, + new TlsContextManagerImpl(bootstrapInfo)); } refCount++; return xdsClient; @@ -151,21 +145,12 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory { if (refCount == 0) { xdsClient.shutdown(); xdsClient = null; - channel.shutdown(); scheduler = SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler); } return null; } } - @VisibleForTesting - @Nullable - ManagedChannel getChannelForTest() { - synchronized (lock) { - return channel; - } - } - @VisibleForTesting @Nullable XdsClient getXdsClientForTest() { diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 6b6be57f04..1daa257e54 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import com.google.protobuf.Any; import io.grpc.Status; import io.grpc.xds.AbstractXdsClient.ResourceType; +import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.Endpoints.LocalityLbEndpoints; import io.grpc.xds.EnvoyServerProtoData.Listener; @@ -31,6 +32,7 @@ import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; import io.grpc.xds.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -495,6 +497,7 @@ abstract class XdsClient { /** * Returns the latest accepted version of the given resource type. */ + // TODO(https://github.com/grpc/grpc-java/issues/8629): remove this String getCurrentVersion(ResourceType type) { throw new UnsupportedOperationException(); } @@ -566,6 +569,7 @@ abstract class XdsClient { * use {@link ClusterDropStats#release} to release its hard reference when it is safe to * stop reporting dropped RPCs for the specified cluster in the future. */ + // TODO(https://github.com/grpc/grpc-java/issues/8628): add ServerInfo arg ClusterDropStats addClusterDropStats(String clusterName, @Nullable String edsServiceName) { throw new UnsupportedOperationException(); } @@ -578,8 +582,48 @@ abstract class XdsClient { * reference when it is safe to stop reporting RPC loads for the specified locality in the * future. */ + // TODO(https://github.com/grpc/grpc-java/issues/8628): add ServerInfo arg ClusterLocalityStats addClusterLocalityStats( String clusterName, @Nullable String edsServiceName, Locality locality) { throw new UnsupportedOperationException(); } + + interface XdsResponseHandler { + /** Called when an LDS response is received. */ + void handleLdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce); + + /** Called when an RDS response is received. */ + void handleRdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce); + + /** Called when an CDS response is received. */ + void handleCdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce); + + /** Called when an EDS response is received. */ + void handleEdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce); + + /** Called when the ADS stream is closed passively. */ + // Must be synchronized. + void handleStreamClosed(Status error); + + /** Called when the ADS stream has been recreated. */ + // Must be synchronized. + void handleStreamRestarted(ServerInfo serverInfo); + } + + interface ResourceStore { + /** + * Returns the collection of resources currently subscribing to or {@code null} if not + * subscribing to any resources for the given type. + * + *

Note an empty collection indicates subscribing to resources of the given type with + * wildcard mode. + */ + // Must be synchronized. + @Nullable + Collection getSubscribedResources(ServerInfo serverInfo, ResourceType type); + } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index 5fa9e3da73..9809738c68 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -59,6 +59,8 @@ import io.grpc.internal.TimeProvider; import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.AbstractXdsClient.ResourceType; import io.grpc.xds.Bootstrapper.CertificateProviderInfo; +import io.grpc.xds.Bootstrapper.ServerInfo; +import io.grpc.xds.ClientXdsClient.XdsChannelFactory; import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.Endpoints.LbEndpoint; import io.grpc.xds.Endpoints.LocalityLbEndpoints; @@ -272,6 +274,12 @@ public abstract class ClientXdsClientTestBase { .start()); channel = cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); + XdsChannelFactory xdsChannelFactory = new XdsChannelFactory() { + @Override + ManagedChannel create(ServerInfo serverInfo) { + return channel; + } + }; Bootstrapper.BootstrapInfo bootstrapInfo = Bootstrapper.BootstrapInfo.builder() @@ -284,7 +292,7 @@ public abstract class ClientXdsClientTestBase { .build(); xdsClient = new ClientXdsClient( - channel, + xdsChannelFactory, bootstrapInfo, Context.ROOT, fakeClock.getScheduledExecutorService(), @@ -2325,6 +2333,7 @@ public abstract class ClientXdsClientTestBase { @Test public void reportLoadStatsToServer() { + xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher); String clusterName = "cluster-foo.googleapis.com"; ClusterDropStats dropStats = xdsClient.addClusterDropStats(clusterName, null); LrsRpcCall lrsCall = loadReportCalls.poll(); diff --git a/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java b/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java index 6a3cba4ac3..14a8f1ce74 100644 --- a/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java +++ b/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import io.grpc.InsecureChannelCredentials; -import io.grpc.ManagedChannel; import io.grpc.internal.ObjectPool; import io.grpc.xds.Bootstrapper.BootstrapInfo; import io.grpc.xds.Bootstrapper.ServerInfo; @@ -90,7 +89,6 @@ public class SharedXdsClientPoolProviderTest { BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build(); RefCountedXdsClientObjectPool xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo); assertThat(xdsClientPool.getXdsClientForTest()).isNull(); - assertThat(xdsClientPool.getChannelForTest()).isNull(); XdsClient xdsClient = xdsClientPool.getObject(); assertThat(xdsClientPool.getXdsClientForTest()).isNotNull(); xdsClientPool.returnObject(xdsClient); @@ -113,7 +111,6 @@ public class SharedXdsClientPoolProviderTest { // returnObject twice assertThat(xdsClientPool.returnObject(xdsClient)).isNull(); assertThat(xdsClient.isShutDown()).isTrue(); - assertThat(xdsClientPool.getChannelForTest().isShutdown()).isTrue(); } @Test @@ -123,14 +120,11 @@ public class SharedXdsClientPoolProviderTest { BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build(); RefCountedXdsClientObjectPool xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo); XdsClient xdsClient1 = xdsClientPool.getObject(); - ManagedChannel channel1 = xdsClientPool.getChannelForTest(); assertThat(xdsClientPool.returnObject(xdsClient1)).isNull(); assertThat(xdsClient1.isShutDown()).isTrue(); - assertThat(channel1.isShutdown()).isTrue(); XdsClient xdsClient2 = xdsClientPool.getObject(); assertThat(xdsClient2).isNotSameInstanceAs(xdsClient1); - assertThat(xdsClientPool.getChannelForTest()).isNotSameInstanceAs(channel1); xdsClientPool.returnObject(xdsClient2); } }