mirror of https://github.com/grpc/grpc-java.git
xds: Revert xds flow control change. (#10784)
* Revert "xds: fix flow control test failure (#10773)" This reverts commitf67ec2ecd9
. * Revert "xDS: implement ADS stream flow control mechanism (#10674)" This reverts commit0a704a52ee
.
This commit is contained in:
parent
f67ec2ecd9
commit
846e008399
|
@ -320,7 +320,7 @@ final class CdsLoadBalancer2 extends LoadBalancer {
|
|||
|
||||
private void start() {
|
||||
shutdown = false;
|
||||
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext);
|
||||
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this);
|
||||
}
|
||||
|
||||
void shutdown() {
|
||||
|
@ -341,85 +341,102 @@ final class CdsLoadBalancer2 extends LoadBalancer {
|
|||
String.format("Unable to load CDS %s. xDS server returned: %s: %s",
|
||||
name, error.getCode(), error.getDescription()))
|
||||
.withCause(error.getCause());
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
// All watchers should receive the same error, so we only propagate it once.
|
||||
if (ClusterState.this == root) {
|
||||
handleClusterDiscoveryError(status);
|
||||
}
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
// All watchers should receive the same error, so we only propagate it once.
|
||||
if (ClusterState.this == root) {
|
||||
handleClusterDiscoveryError(status);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResourceDoesNotExist(String resourceName) {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
discovered = true;
|
||||
result = null;
|
||||
if (childClusterStates != null) {
|
||||
for (ClusterState state : childClusterStates.values()) {
|
||||
state.shutdown();
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
discovered = true;
|
||||
result = null;
|
||||
if (childClusterStates != null) {
|
||||
for (ClusterState state : childClusterStates.values()) {
|
||||
state.shutdown();
|
||||
}
|
||||
childClusterStates = null;
|
||||
}
|
||||
handleClusterDiscovered();
|
||||
}
|
||||
childClusterStates = null;
|
||||
}
|
||||
handleClusterDiscovered();
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onChanged(final CdsUpdate update) {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
|
||||
discovered = true;
|
||||
result = update;
|
||||
if (update.clusterType() == ClusterType.AGGREGATE) {
|
||||
isLeaf = false;
|
||||
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
|
||||
update.clusterName(), update.prioritizedClusterNames());
|
||||
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
|
||||
for (String cluster : update.prioritizedClusterNames()) {
|
||||
if (newChildStates.containsKey(cluster)) {
|
||||
logger.log(XdsLogLevel.WARNING,
|
||||
String.format("duplicate cluster name %s in aggregate %s is being ignored",
|
||||
cluster, update.clusterName()));
|
||||
continue;
|
||||
class ClusterDiscovered implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
|
||||
ClusterState childState;
|
||||
if (clusterStates.containsKey(cluster)) {
|
||||
childState = clusterStates.get(cluster);
|
||||
if (childState.shutdown) {
|
||||
childState.start();
|
||||
}
|
||||
} else {
|
||||
childState = new ClusterState(cluster);
|
||||
clusterStates.put(cluster, childState);
|
||||
childState.start();
|
||||
}
|
||||
newChildStates.put(cluster, childState);
|
||||
} else {
|
||||
newChildStates.put(cluster, childClusterStates.remove(cluster));
|
||||
}
|
||||
}
|
||||
if (childClusterStates != null) { // stop subscribing to revoked child clusters
|
||||
for (ClusterState watcher : childClusterStates.values()) {
|
||||
watcher.shutdown();
|
||||
}
|
||||
}
|
||||
childClusterStates = newChildStates;
|
||||
} else if (update.clusterType() == ClusterType.EDS) {
|
||||
isLeaf = true;
|
||||
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
|
||||
update.clusterName(), update.edsServiceName());
|
||||
} else { // logical DNS
|
||||
isLeaf = true;
|
||||
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
|
||||
}
|
||||
handleClusterDiscovered();
|
||||
}
|
||||
|
||||
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
|
||||
discovered = true;
|
||||
result = update;
|
||||
if (update.clusterType() == ClusterType.AGGREGATE) {
|
||||
isLeaf = false;
|
||||
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
|
||||
update.clusterName(), update.prioritizedClusterNames());
|
||||
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
|
||||
for (String cluster : update.prioritizedClusterNames()) {
|
||||
if (newChildStates.containsKey(cluster)) {
|
||||
logger.log(XdsLogLevel.WARNING,
|
||||
String.format("duplicate cluster name %s in aggregate %s is being ignored",
|
||||
cluster, update.clusterName()));
|
||||
continue;
|
||||
}
|
||||
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
|
||||
ClusterState childState;
|
||||
if (clusterStates.containsKey(cluster)) {
|
||||
childState = clusterStates.get(cluster);
|
||||
if (childState.shutdown) {
|
||||
childState.start();
|
||||
}
|
||||
} else {
|
||||
childState = new ClusterState(cluster);
|
||||
clusterStates.put(cluster, childState);
|
||||
childState.start();
|
||||
}
|
||||
newChildStates.put(cluster, childState);
|
||||
} else {
|
||||
newChildStates.put(cluster, childClusterStates.remove(cluster));
|
||||
}
|
||||
}
|
||||
if (childClusterStates != null) { // stop subscribing to revoked child clusters
|
||||
for (ClusterState watcher : childClusterStates.values()) {
|
||||
watcher.shutdown();
|
||||
}
|
||||
}
|
||||
childClusterStates = newChildStates;
|
||||
} else if (update.clusterType() == ClusterType.EDS) {
|
||||
isLeaf = true;
|
||||
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
|
||||
update.clusterName(), update.edsServiceName());
|
||||
} else { // logical DNS
|
||||
isLeaf = true;
|
||||
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
|
||||
}
|
||||
handleClusterDiscovered();
|
||||
}
|
||||
}
|
||||
|
||||
syncContext.execute(new ClusterDiscovered());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -366,8 +366,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
void start() {
|
||||
String resourceName = edsServiceName != null ? edsServiceName : name;
|
||||
logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName);
|
||||
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),
|
||||
resourceName, this, syncContext);
|
||||
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), resourceName, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -453,7 +452,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
new EndpointsUpdated().run();
|
||||
syncContext.execute(new EndpointsUpdated());
|
||||
}
|
||||
|
||||
private List<String> generatePriorityNames(String name,
|
||||
|
@ -492,28 +491,38 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
|
||||
@Override
|
||||
public void onResourceDoesNotExist(final String resourceName) {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
|
||||
status = Status.OK;
|
||||
resolved = true;
|
||||
result = null; // resource revoked
|
||||
handleEndpointResourceUpdate();
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
|
||||
status = Status.OK;
|
||||
resolved = true;
|
||||
result = null; // resource revoked
|
||||
handleEndpointResourceUpdate();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final Status error) {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
String resourceName = edsServiceName != null ? edsServiceName : name;
|
||||
status = Status.UNAVAILABLE
|
||||
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
|
||||
resourceName, error.getCode(), error.getDescription()))
|
||||
.withCause(error.getCause());
|
||||
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
|
||||
handleEndpointResolutionError();
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
String resourceName = edsServiceName != null ? edsServiceName : name;
|
||||
status = Status.UNAVAILABLE
|
||||
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
|
||||
resourceName, error.getCode(), error.getDescription()))
|
||||
.withCause(error.getCause());
|
||||
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
|
||||
handleEndpointResolutionError();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -38,9 +38,9 @@ import io.grpc.SynchronizationContext.ScheduledHandle;
|
|||
import io.grpc.internal.BackoffPolicy;
|
||||
import io.grpc.stub.ClientCallStreamObserver;
|
||||
import io.grpc.stub.ClientResponseObserver;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import io.grpc.xds.Bootstrapper.ServerInfo;
|
||||
import io.grpc.xds.EnvoyProtoData.Node;
|
||||
import io.grpc.xds.XdsClient.ProcessingTracker;
|
||||
import io.grpc.xds.XdsClient.ResourceStore;
|
||||
import io.grpc.xds.XdsClient.XdsResponseHandler;
|
||||
import io.grpc.xds.XdsClientImpl.XdsChannelFactory;
|
||||
|
@ -288,8 +288,6 @@ final class ControlPlaneClient {
|
|||
|
||||
abstract boolean isReady();
|
||||
|
||||
abstract void request(int count);
|
||||
|
||||
/**
|
||||
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
|
||||
* {@code errorDetail}. Used for reacting to a specific discovery response. For
|
||||
|
@ -316,10 +314,7 @@ final class ControlPlaneClient {
|
|||
}
|
||||
responseReceived = true;
|
||||
respNonces.put(type, nonce);
|
||||
ProcessingTracker processingTracker = new ProcessingTracker(() -> request(1), syncContext);
|
||||
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
|
||||
processingTracker);
|
||||
processingTracker.onComplete();
|
||||
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce);
|
||||
}
|
||||
|
||||
final void handleRpcError(Throwable t) {
|
||||
|
@ -377,7 +372,7 @@ final class ControlPlaneClient {
|
|||
}
|
||||
|
||||
private final class AdsStreamV3 extends AbstractAdsStream {
|
||||
private ClientCallStreamObserver<DiscoveryRequest> requestWriter;
|
||||
private StreamObserver<DiscoveryRequest> requestWriter;
|
||||
|
||||
@Override
|
||||
public boolean isReady() {
|
||||
|
@ -385,7 +380,6 @@ final class ControlPlaneClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
void start() {
|
||||
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
|
||||
AggregatedDiscoveryServiceGrpc.newStub(channel);
|
||||
|
@ -395,7 +389,6 @@ final class ControlPlaneClient {
|
|||
|
||||
@Override
|
||||
public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) {
|
||||
requestStream.disableAutoRequestWithInitial(1);
|
||||
requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler);
|
||||
}
|
||||
|
||||
|
@ -444,8 +437,7 @@ final class ControlPlaneClient {
|
|||
}
|
||||
}
|
||||
|
||||
requestWriter = (ClientCallStreamObserver) stub.streamAggregatedResources(
|
||||
new AdsClientResponseObserver());
|
||||
requestWriter = stub.streamAggregatedResources(new AdsClientResponseObserver());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -475,11 +467,6 @@ final class ControlPlaneClient {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void request(int count) {
|
||||
requestWriter.request(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
void sendError(Exception error) {
|
||||
requestWriter.onError(error);
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.google.common.base.Joiner;
|
|||
import com.google.common.base.Splitter;
|
||||
import com.google.common.net.UrlEscapers;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.protobuf.Any;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.xds.Bootstrapper.ServerInfo;
|
||||
|
@ -37,8 +36,6 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
|
@ -307,15 +304,9 @@ abstract class XdsClient {
|
|||
/**
|
||||
* Registers a data watcher for the given Xds resource.
|
||||
*/
|
||||
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
|
||||
ResourceWatcher<T> watcher,
|
||||
Executor executor) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
|
||||
ResourceWatcher<T> watcher) {
|
||||
watchXdsResource(type, resourceName, watcher, MoreExecutors.directExecutor());
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -362,32 +353,11 @@ abstract class XdsClient {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
static final class ProcessingTracker {
|
||||
private final AtomicInteger pendingTask = new AtomicInteger(1);
|
||||
private final Executor executor;
|
||||
private final Runnable completionListener;
|
||||
|
||||
ProcessingTracker(Runnable completionListener, Executor executor) {
|
||||
this.executor = executor;
|
||||
this.completionListener = completionListener;
|
||||
}
|
||||
|
||||
void startTask() {
|
||||
pendingTask.incrementAndGet();
|
||||
}
|
||||
|
||||
void onComplete() {
|
||||
if (pendingTask.decrementAndGet() == 0) {
|
||||
executor.execute(completionListener);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface XdsResponseHandler {
|
||||
/** Called when a xds response is received. */
|
||||
void handleResourceResponse(
|
||||
XdsResourceType<?> resourceType, ServerInfo serverInfo, String versionInfo,
|
||||
List<Any> resources, String nonce, ProcessingTracker processingTracker);
|
||||
List<Any> resources, String nonce);
|
||||
|
||||
/** Called when the ADS stream is closed passively. */
|
||||
// Must be synchronized.
|
||||
|
|
|
@ -54,11 +54,11 @@ import java.net.URI;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.logging.Level;
|
||||
|
@ -166,7 +166,7 @@ final class XdsClientImpl extends XdsClient
|
|||
@Override
|
||||
public void handleResourceResponse(
|
||||
XdsResourceType<?> xdsResourceType, ServerInfo serverInfo, String versionInfo,
|
||||
List<Any> resources, String nonce, ProcessingTracker processingTracker) {
|
||||
List<Any> resources, String nonce) {
|
||||
checkNotNull(xdsResourceType, "xdsResourceType");
|
||||
syncContext.throwIfNotInThisSynchronizationContext();
|
||||
Set<String> toParseResourceNames = null;
|
||||
|
@ -178,7 +178,7 @@ final class XdsClientImpl extends XdsClient
|
|||
XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce,
|
||||
bootstrapInfo, filterRegistry, loadBalancerRegistry, tlsContextManager,
|
||||
toParseResourceNames);
|
||||
handleResourceUpdate(args, resources, xdsResourceType, processingTracker);
|
||||
handleResourceUpdate(args, resources, xdsResourceType);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -189,7 +189,7 @@ final class XdsClientImpl extends XdsClient
|
|||
resourceSubscribers.values()) {
|
||||
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
|
||||
if (!subscriber.hasResult()) {
|
||||
subscriber.onError(error, null);
|
||||
subscriber.onError(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -289,8 +289,7 @@ final class XdsClientImpl extends XdsClient
|
|||
|
||||
@Override
|
||||
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
|
||||
ResourceWatcher<T> watcher,
|
||||
Executor watcherExecutor) {
|
||||
ResourceWatcher<T> watcher) {
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -300,7 +299,7 @@ final class XdsClientImpl extends XdsClient
|
|||
subscribedResourceTypeUrls.put(type.typeUrl(), type);
|
||||
}
|
||||
ResourceSubscriber<T> subscriber =
|
||||
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
|
||||
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
|
||||
if (subscriber == null) {
|
||||
logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName);
|
||||
subscriber = new ResourceSubscriber<>(type, resourceName);
|
||||
|
@ -309,7 +308,7 @@ final class XdsClientImpl extends XdsClient
|
|||
subscriber.xdsChannel.adjustResourceSubscription(type);
|
||||
}
|
||||
}
|
||||
subscriber.addWatcher(watcher, watcherExecutor);
|
||||
subscriber.addWatcher(watcher);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -334,6 +333,7 @@ final class XdsClientImpl extends XdsClient
|
|||
if (resourceSubscribers.get(type).isEmpty()) {
|
||||
resourceSubscribers.remove(type);
|
||||
subscribedResourceTypeUrls.remove(type.typeUrl());
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -420,9 +420,9 @@ final class XdsClientImpl extends XdsClient
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T extends ResourceUpdate> void handleResourceUpdate(
|
||||
XdsResourceType.Args args, List<Any> resources, XdsResourceType<T> xdsResourceType,
|
||||
ProcessingTracker processingTracker) {
|
||||
private <T extends ResourceUpdate> void handleResourceUpdate(XdsResourceType.Args args,
|
||||
List<Any> resources,
|
||||
XdsResourceType<T> xdsResourceType) {
|
||||
ValidatedResourceUpdate<T> result = xdsResourceType.parse(args, resources);
|
||||
logger.log(XdsLogger.XdsLogLevel.INFO,
|
||||
"Received {0} Response version {1} nonce {2}. Parsed resources: {3}",
|
||||
|
@ -449,10 +449,10 @@ final class XdsClientImpl extends XdsClient
|
|||
for (Map.Entry<String, ResourceSubscriber<?>> entry : subscribedResources.entrySet()) {
|
||||
String resourceName = entry.getKey();
|
||||
ResourceSubscriber<T> subscriber = (ResourceSubscriber<T>) entry.getValue();
|
||||
|
||||
if (parsedResources.containsKey(resourceName)) {
|
||||
// Happy path: the resource updated successfully. Notify the watchers of the update.
|
||||
subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime,
|
||||
processingTracker);
|
||||
subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -471,7 +471,7 @@ final class XdsClientImpl extends XdsClient
|
|||
// The resource is missing. Reuse the cached resource if possible.
|
||||
if (subscriber.data == null) {
|
||||
// No cached data. Notify the watchers of an invalid update.
|
||||
subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail), processingTracker);
|
||||
subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
@ -480,7 +480,7 @@ final class XdsClientImpl extends XdsClient
|
|||
// from the ADS update. Note that we can only do this if the resource update is coming from
|
||||
// the same xDS server that the ResourceSubscriber is subscribed to.
|
||||
if (subscriber.serverInfo.equals(args.serverInfo)) {
|
||||
subscriber.onAbsent(processingTracker);
|
||||
subscriber.onAbsent();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -493,7 +493,7 @@ final class XdsClientImpl extends XdsClient
|
|||
@Nullable private final ControlPlaneClient xdsChannel;
|
||||
private final XdsResourceType<T> type;
|
||||
private final String resource;
|
||||
private final Map<ResourceWatcher<T>, Executor> watchers = new HashMap<>();
|
||||
private final Set<ResourceWatcher<T>> watchers = new HashSet<>();
|
||||
@Nullable private T data;
|
||||
private boolean absent;
|
||||
// Tracks whether the deletion has been ignored per bootstrap server feature.
|
||||
|
@ -553,26 +553,22 @@ final class XdsClientImpl extends XdsClient
|
|||
return bootstrapInfo.servers().get(0); // use first server
|
||||
}
|
||||
|
||||
void addWatcher(ResourceWatcher<T> watcher, Executor watcherExecutor) {
|
||||
checkArgument(!watchers.containsKey(watcher), "watcher %s already registered", watcher);
|
||||
watchers.put(watcher, watcherExecutor);
|
||||
T savedData = data;
|
||||
boolean savedAbsent = absent;
|
||||
watcherExecutor.execute(() -> {
|
||||
if (errorDescription != null) {
|
||||
watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription));
|
||||
return;
|
||||
}
|
||||
if (savedData != null) {
|
||||
notifyWatcher(watcher, savedData);
|
||||
} else if (savedAbsent) {
|
||||
watcher.onResourceDoesNotExist(resource);
|
||||
}
|
||||
});
|
||||
void addWatcher(ResourceWatcher<T> watcher) {
|
||||
checkArgument(!watchers.contains(watcher), "watcher %s already registered", watcher);
|
||||
watchers.add(watcher);
|
||||
if (errorDescription != null) {
|
||||
watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription));
|
||||
return;
|
||||
}
|
||||
if (data != null) {
|
||||
notifyWatcher(watcher, data);
|
||||
} else if (absent) {
|
||||
watcher.onResourceDoesNotExist(resource);
|
||||
}
|
||||
}
|
||||
|
||||
void removeWatcher(ResourceWatcher<T> watcher) {
|
||||
checkArgument(watchers.containsKey(watcher), "watcher %s not registered", watcher);
|
||||
checkArgument(watchers.contains(watcher), "watcher %s not registered", watcher);
|
||||
watchers.remove(watcher);
|
||||
}
|
||||
|
||||
|
@ -590,7 +586,7 @@ final class XdsClientImpl extends XdsClient
|
|||
logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout",
|
||||
type, resource);
|
||||
respTimer = null;
|
||||
onAbsent(null);
|
||||
onAbsent();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -637,8 +633,7 @@ final class XdsClientImpl extends XdsClient
|
|||
return data != null || absent;
|
||||
}
|
||||
|
||||
void onData(ParsedResource<T> parsedResource, String version, long updateTime,
|
||||
ProcessingTracker processingTracker) {
|
||||
void onData(ParsedResource<T> parsedResource, String version, long updateTime) {
|
||||
if (respTimer != null && respTimer.isPending()) {
|
||||
respTimer.cancel();
|
||||
respTimer = null;
|
||||
|
@ -655,20 +650,13 @@ final class XdsClientImpl extends XdsClient
|
|||
resourceDeletionIgnored = false;
|
||||
}
|
||||
if (!Objects.equals(oldData, data)) {
|
||||
for (ResourceWatcher<T> watcher : watchers.keySet()) {
|
||||
processingTracker.startTask();
|
||||
watchers.get(watcher).execute(() -> {
|
||||
try {
|
||||
notifyWatcher(watcher, data);
|
||||
} finally {
|
||||
processingTracker.onComplete();
|
||||
}
|
||||
});
|
||||
for (ResourceWatcher<T> watcher : watchers) {
|
||||
notifyWatcher(watcher, data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void onAbsent(@Nullable ProcessingTracker processingTracker) {
|
||||
void onAbsent() {
|
||||
if (respTimer != null && respTimer.isPending()) { // too early to conclude absence
|
||||
return;
|
||||
}
|
||||
|
@ -692,24 +680,13 @@ final class XdsClientImpl extends XdsClient
|
|||
data = null;
|
||||
absent = true;
|
||||
metadata = ResourceMetadata.newResourceMetadataDoesNotExist();
|
||||
for (ResourceWatcher<T> watcher : watchers.keySet()) {
|
||||
if (processingTracker != null) {
|
||||
processingTracker.startTask();
|
||||
}
|
||||
watchers.get(watcher).execute(() -> {
|
||||
try {
|
||||
watcher.onResourceDoesNotExist(resource);
|
||||
} finally {
|
||||
if (processingTracker != null) {
|
||||
processingTracker.onComplete();
|
||||
}
|
||||
}
|
||||
});
|
||||
for (ResourceWatcher<T> watcher : watchers) {
|
||||
watcher.onResourceDoesNotExist(resource);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void onError(Status error, @Nullable ProcessingTracker tracker) {
|
||||
void onError(Status error) {
|
||||
if (respTimer != null && respTimer.isPending()) {
|
||||
respTimer.cancel();
|
||||
respTimer = null;
|
||||
|
@ -722,19 +699,8 @@ final class XdsClientImpl extends XdsClient
|
|||
.withDescription(description + "nodeID: " + bootstrapInfo.node().getId())
|
||||
.withCause(error.getCause());
|
||||
|
||||
for (ResourceWatcher<T> watcher : watchers.keySet()) {
|
||||
if (tracker != null) {
|
||||
tracker.startTask();
|
||||
}
|
||||
watchers.get(watcher).execute(() -> {
|
||||
try {
|
||||
watcher.onError(errorAugmented);
|
||||
} finally {
|
||||
if (tracker != null) {
|
||||
tracker.onComplete();
|
||||
}
|
||||
}
|
||||
});
|
||||
for (ResourceWatcher<T> watcher : watchers) {
|
||||
watcher.onError(errorAugmented);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -638,52 +638,66 @@ final class XdsNameResolver extends NameResolver {
|
|||
|
||||
@Override
|
||||
public void onChanged(final LdsUpdate update) {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
|
||||
HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
|
||||
List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
|
||||
String rdsName = httpConnectionManager.rdsName();
|
||||
cleanUpRouteDiscoveryState();
|
||||
if (virtualHosts != null) {
|
||||
updateRoutes(virtualHosts, httpConnectionManager.httpMaxStreamDurationNano(),
|
||||
httpConnectionManager.httpFilterConfigs());
|
||||
} else {
|
||||
routeDiscoveryState = new RouteDiscoveryState(
|
||||
rdsName, httpConnectionManager.httpMaxStreamDurationNano(),
|
||||
httpConnectionManager.httpFilterConfigs());
|
||||
logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
|
||||
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
|
||||
rdsName, routeDiscoveryState, syncContext);
|
||||
}
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
|
||||
HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
|
||||
List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
|
||||
String rdsName = httpConnectionManager.rdsName();
|
||||
cleanUpRouteDiscoveryState();
|
||||
if (virtualHosts != null) {
|
||||
updateRoutes(virtualHosts, httpConnectionManager.httpMaxStreamDurationNano(),
|
||||
httpConnectionManager.httpFilterConfigs());
|
||||
} else {
|
||||
routeDiscoveryState = new RouteDiscoveryState(
|
||||
rdsName, httpConnectionManager.httpMaxStreamDurationNano(),
|
||||
httpConnectionManager.httpFilterConfigs());
|
||||
logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
|
||||
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
|
||||
rdsName, routeDiscoveryState);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final Status error) {
|
||||
if (stopped || receivedConfig) {
|
||||
return;
|
||||
}
|
||||
listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
|
||||
String.format("Unable to load LDS %s. xDS server returned: %s: %s",
|
||||
ldsResourceName, error.getCode(), error.getDescription())));
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (stopped || receivedConfig) {
|
||||
return;
|
||||
}
|
||||
listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
|
||||
String.format("Unable to load LDS %s. xDS server returned: %s: %s",
|
||||
ldsResourceName, error.getCode(), error.getDescription())));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResourceDoesNotExist(final String resourceName) {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
String error = "LDS resource does not exist: " + resourceName;
|
||||
logger.log(XdsLogLevel.INFO, error);
|
||||
cleanUpRouteDiscoveryState();
|
||||
cleanUpRoutes(error);
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
String error = "LDS resource does not exist: " + resourceName;
|
||||
logger.log(XdsLogLevel.INFO, error);
|
||||
cleanUpRouteDiscoveryState();
|
||||
cleanUpRoutes(error);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void start() {
|
||||
logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName);
|
||||
xdsClient.watchXdsResource(XdsListenerResource.getInstance(),
|
||||
ldsResourceName, this, syncContext);
|
||||
xdsClient.watchXdsResource(XdsListenerResource.getInstance(), ldsResourceName, this);
|
||||
}
|
||||
|
||||
private void stop() {
|
||||
|
@ -851,31 +865,47 @@ final class XdsNameResolver extends NameResolver {
|
|||
|
||||
@Override
|
||||
public void onChanged(final RdsUpdate update) {
|
||||
if (RouteDiscoveryState.this != routeDiscoveryState) {
|
||||
return;
|
||||
}
|
||||
logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update);
|
||||
updateRoutes(update.virtualHosts, httpMaxStreamDurationNano, filterConfigs);
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (RouteDiscoveryState.this != routeDiscoveryState) {
|
||||
return;
|
||||
}
|
||||
logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update);
|
||||
updateRoutes(update.virtualHosts, httpMaxStreamDurationNano,
|
||||
filterConfigs);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final Status error) {
|
||||
if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) {
|
||||
return;
|
||||
}
|
||||
listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
|
||||
String.format("Unable to load RDS %s. xDS server returned: %s: %s",
|
||||
resourceName, error.getCode(), error.getDescription())));
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) {
|
||||
return;
|
||||
}
|
||||
listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
|
||||
String.format("Unable to load RDS %s. xDS server returned: %s: %s",
|
||||
resourceName, error.getCode(), error.getDescription())));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResourceDoesNotExist(final String resourceName) {
|
||||
if (RouteDiscoveryState.this != routeDiscoveryState) {
|
||||
return;
|
||||
}
|
||||
String error = "RDS resource does not exist: " + resourceName;
|
||||
logger.log(XdsLogLevel.INFO, error);
|
||||
cleanUpRoutes(error);
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (RouteDiscoveryState.this != routeDiscoveryState) {
|
||||
return;
|
||||
}
|
||||
String error = "RDS resource does not exist: " + resourceName;
|
||||
logger.log(XdsLogLevel.INFO, error);
|
||||
cleanUpRoutes(error);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -366,78 +366,92 @@ final class XdsServerWrapper extends Server {
|
|||
|
||||
private DiscoveryState(String resourceName) {
|
||||
this.resourceName = checkNotNull(resourceName, "resourceName");
|
||||
xdsClient.watchXdsResource(
|
||||
XdsListenerResource.getInstance(), resourceName, this, syncContext);
|
||||
xdsClient.watchXdsResource(XdsListenerResource.getInstance(), resourceName, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onChanged(final LdsUpdate update) {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
logger.log(Level.FINEST, "Received Lds update {0}", update);
|
||||
checkNotNull(update.listener(), "update");
|
||||
if (!pendingRds.isEmpty()) {
|
||||
// filter chain state has not yet been applied to filterChainSelectorManager and there
|
||||
// are two sets of sslContextProviderSuppliers, so we release the old ones.
|
||||
releaseSuppliersInFlight();
|
||||
pendingRds.clear();
|
||||
}
|
||||
filterChains = update.listener().filterChains();
|
||||
defaultFilterChain = update.listener().defaultFilterChain();
|
||||
List<FilterChain> allFilterChains = filterChains;
|
||||
if (defaultFilterChain != null) {
|
||||
allFilterChains = new ArrayList<>(filterChains);
|
||||
allFilterChains.add(defaultFilterChain);
|
||||
}
|
||||
Set<String> allRds = new HashSet<>();
|
||||
for (FilterChain filterChain : allFilterChains) {
|
||||
HttpConnectionManager hcm = filterChain.httpConnectionManager();
|
||||
if (hcm.virtualHosts() == null) {
|
||||
RouteDiscoveryState rdsState = routeDiscoveryStates.get(hcm.rdsName());
|
||||
if (rdsState == null) {
|
||||
rdsState = new RouteDiscoveryState(hcm.rdsName());
|
||||
routeDiscoveryStates.put(hcm.rdsName(), rdsState);
|
||||
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
|
||||
hcm.rdsName(), rdsState, syncContext);
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
if (rdsState.isPending) {
|
||||
pendingRds.add(hcm.rdsName());
|
||||
logger.log(Level.FINEST, "Received Lds update {0}", update);
|
||||
checkNotNull(update.listener(), "update");
|
||||
if (!pendingRds.isEmpty()) {
|
||||
// filter chain state has not yet been applied to filterChainSelectorManager and there
|
||||
// are two sets of sslContextProviderSuppliers, so we release the old ones.
|
||||
releaseSuppliersInFlight();
|
||||
pendingRds.clear();
|
||||
}
|
||||
filterChains = update.listener().filterChains();
|
||||
defaultFilterChain = update.listener().defaultFilterChain();
|
||||
List<FilterChain> allFilterChains = filterChains;
|
||||
if (defaultFilterChain != null) {
|
||||
allFilterChains = new ArrayList<>(filterChains);
|
||||
allFilterChains.add(defaultFilterChain);
|
||||
}
|
||||
Set<String> allRds = new HashSet<>();
|
||||
for (FilterChain filterChain : allFilterChains) {
|
||||
HttpConnectionManager hcm = filterChain.httpConnectionManager();
|
||||
if (hcm.virtualHosts() == null) {
|
||||
RouteDiscoveryState rdsState = routeDiscoveryStates.get(hcm.rdsName());
|
||||
if (rdsState == null) {
|
||||
rdsState = new RouteDiscoveryState(hcm.rdsName());
|
||||
routeDiscoveryStates.put(hcm.rdsName(), rdsState);
|
||||
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
|
||||
hcm.rdsName(), rdsState);
|
||||
}
|
||||
if (rdsState.isPending) {
|
||||
pendingRds.add(hcm.rdsName());
|
||||
}
|
||||
allRds.add(hcm.rdsName());
|
||||
}
|
||||
}
|
||||
for (Map.Entry<String, RouteDiscoveryState> entry: routeDiscoveryStates.entrySet()) {
|
||||
if (!allRds.contains(entry.getKey())) {
|
||||
xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(),
|
||||
entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
routeDiscoveryStates.keySet().retainAll(allRds);
|
||||
if (pendingRds.isEmpty()) {
|
||||
updateSelector();
|
||||
}
|
||||
allRds.add(hcm.rdsName());
|
||||
}
|
||||
}
|
||||
for (Map.Entry<String, RouteDiscoveryState> entry: routeDiscoveryStates.entrySet()) {
|
||||
if (!allRds.contains(entry.getKey())) {
|
||||
xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(),
|
||||
entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
routeDiscoveryStates.keySet().retainAll(allRds);
|
||||
if (pendingRds.isEmpty()) {
|
||||
updateSelector();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResourceDoesNotExist(final String resourceName) {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
StatusException statusException = Status.UNAVAILABLE.withDescription(
|
||||
"Listener " + resourceName + " unavailable").asException();
|
||||
handleConfigNotFound(statusException);
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
StatusException statusException = Status.UNAVAILABLE.withDescription(
|
||||
"Listener " + resourceName + " unavailable").asException();
|
||||
handleConfigNotFound(statusException);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final Status error) {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
logger.log(Level.FINE, "Error from XdsClient", error);
|
||||
if (!isServing) {
|
||||
listener.onNotServing(error.asException());
|
||||
}
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
logger.log(Level.FINE, "Error from XdsClient", error);
|
||||
if (!isServing) {
|
||||
listener.onNotServing(error.asException());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void shutdown() {
|
||||
|
|
|
@ -65,7 +65,6 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import javax.annotation.Nullable;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -665,7 +664,7 @@ public class CdsLoadBalancer2Test {
|
|||
outlierDetection)
|
||||
.lbPolicyConfig(ImmutableMap.of("unknown", ImmutableMap.of("foo", "bar"))).build());
|
||||
} catch (Exception e) {
|
||||
assertThat(e).hasMessageThat().contains("No provider available");
|
||||
assertThat(e).hasCauseThat().hasMessageThat().contains("No provider available");
|
||||
return;
|
||||
}
|
||||
fail("Expected the unknown LB to cause an exception");
|
||||
|
@ -680,7 +679,7 @@ public class CdsLoadBalancer2Test {
|
|||
ImmutableMap.of("ring_hash_experimental", ImmutableMap.of("minRingSize", "-1")))
|
||||
.build());
|
||||
} catch (Exception e) {
|
||||
assertThat(e).hasMessageThat().contains("Unable to parse");
|
||||
assertThat(e).hasCauseThat().hasMessageThat().contains("Unable to parse");
|
||||
return;
|
||||
}
|
||||
fail("Expected the invalid config to cause an exception");
|
||||
|
@ -790,7 +789,7 @@ public class CdsLoadBalancer2Test {
|
|||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
|
||||
ResourceWatcher<T> watcher, Executor syncContext) {
|
||||
ResourceWatcher<T> watcher) {
|
||||
assertThat(type.typeName()).isEqualTo("CDS");
|
||||
watchers.computeIfAbsent(resourceName, k -> new ArrayList<>())
|
||||
.add((ResourceWatcher<CdsUpdate>)watcher);
|
||||
|
|
|
@ -88,7 +88,6 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.annotation.Nullable;
|
||||
import org.junit.After;
|
||||
|
@ -1182,11 +1181,11 @@ public class ClusterResolverLoadBalancerTest {
|
|||
private static final class FakeXdsClient extends XdsClient {
|
||||
private final Map<String, ResourceWatcher<EdsUpdate>> watchers = new HashMap<>();
|
||||
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
|
||||
ResourceWatcher<T> watcher,
|
||||
Executor syncContext) {
|
||||
ResourceWatcher<T> watcher) {
|
||||
assertThat(type.typeName()).isEqualTo("EDS");
|
||||
assertThat(watchers).doesNotContainKey(resourceName);
|
||||
watchers.put(resourceName, (ResourceWatcher<EdsUpdate>) watcher);
|
||||
|
|
|
@ -20,10 +20,7 @@ import static com.google.common.truth.Truth.assertThat;
|
|||
import static com.google.common.truth.Truth.assertWithMessage;
|
||||
import static io.grpc.xds.XdsClientImpl.XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.isA;
|
||||
import static org.mockito.Mockito.atLeastOnce;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
@ -107,9 +104,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.BlockingDeque;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -127,10 +121,8 @@ import org.mockito.Captor;
|
|||
import org.mockito.InOrder;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.junit.MockitoJUnit;
|
||||
import org.mockito.junit.MockitoRule;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
/**
|
||||
* Tests for {@link XdsClientImpl}.
|
||||
|
@ -2959,125 +2951,6 @@ public abstract class XdsClientImplTestBase {
|
|||
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void flowControlAbsent() throws Exception {
|
||||
String anotherCdsResource = CDS_RESOURCE + "2";
|
||||
FakeClock fakeWatchClock = new FakeClock();
|
||||
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CDS_RESOURCE,
|
||||
cdsResourceWatcher, fakeWatchClock.getScheduledExecutorService());
|
||||
ResourceWatcher<CdsUpdate> anotherWatcher = mock(ResourceWatcher.class);
|
||||
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), anotherCdsResource,
|
||||
anotherWatcher, fakeWatchClock.getScheduledExecutorService());
|
||||
verifyResourceMetadataRequested(CDS, CDS_RESOURCE);
|
||||
verifyResourceMetadataRequested(CDS, anotherCdsResource);
|
||||
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
|
||||
call.verifyRequest(CDS, Arrays.asList(CDS_RESOURCE, anotherCdsResource), "", "", NODE);
|
||||
assertThat(fakeWatchClock.runDueTasks()).isEqualTo(2);
|
||||
call.sendResponse(CDS, testClusterRoundRobin, VERSION_1, "0000");
|
||||
verifyResourceMetadataAcked(
|
||||
CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1, TIME_INCREMENT);
|
||||
call.verifyRequest(CDS, Arrays.asList(CDS_RESOURCE, anotherCdsResource), VERSION_1,
|
||||
"0000", NODE);
|
||||
verifyNoInteractions(cdsResourceWatcher, anotherWatcher);
|
||||
fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
|
||||
assertThat(fakeWatchClock.getPendingTasks().size()).isEqualTo(2);
|
||||
CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
doAnswer(blockUpdate(barrier)).when(cdsResourceWatcher).onChanged(any(CdsUpdate.class));
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
new Thread(() -> {
|
||||
try {
|
||||
fakeWatchClock.runDueTasks();
|
||||
latch.countDown();
|
||||
} catch (Exception ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}).start();
|
||||
ImmutableMap<String, Any> resourcesV2 = ImmutableMap.of(
|
||||
CDS_RESOURCE, Any.pack(mf.buildEdsCluster(CDS_RESOURCE, "A.2", "round_robin", null,
|
||||
null, false, null,
|
||||
"envoy.transport_sockets.tls", null, null
|
||||
)),
|
||||
anotherCdsResource, Any.pack(mf.buildClusterInvalid(anotherCdsResource)));
|
||||
call.sendResponse(CDS, resourcesV2.values().asList(), VERSION_2, "0001");
|
||||
assertThat(call.isReady()).isFalse();
|
||||
verifyResourceMetadataAcked(
|
||||
CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1, TIME_INCREMENT);
|
||||
barrier.await();
|
||||
verify(cdsResourceWatcher, atLeastOnce()).onChanged(any());
|
||||
String errorMsg = "CDS response Cluster 'cluster.googleapis.com2' validation error: "
|
||||
+ "Cluster cluster.googleapis.com2: unspecified cluster discovery type";
|
||||
call.verifyRequestNack(CDS, Arrays.asList(CDS_RESOURCE, anotherCdsResource), VERSION_1, "0001",
|
||||
NODE, Arrays.asList(errorMsg));
|
||||
verify(anotherWatcher).onResourceDoesNotExist(eq(anotherCdsResource));
|
||||
barrier.await();
|
||||
latch.await(10, TimeUnit.SECONDS);
|
||||
verify(cdsResourceWatcher, times(2)).onChanged(any());
|
||||
verify(anotherWatcher).onError(any());
|
||||
}
|
||||
|
||||
private Answer<Void> blockUpdate(CyclicBarrier barrier) {
|
||||
return new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
barrier.await();
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void simpleFlowControl() throws Exception {
|
||||
FakeClock fakeWatchClock = new FakeClock();
|
||||
DiscoveryRpcCall call = startResourceWatcher(XdsEndpointResource.getInstance(), EDS_RESOURCE,
|
||||
edsResourceWatcher, fakeWatchClock.getScheduledExecutorService());
|
||||
verifyResourceMetadataRequested(EDS, EDS_RESOURCE);
|
||||
assertThat(fakeWatchClock.runDueTasks()).isEqualTo(1);
|
||||
|
||||
call.sendResponse(EDS, testClusterLoadAssignment, VERSION_1, "0000");
|
||||
call.verifyRequest(EDS, EDS_RESOURCE, VERSION_1, "0000", NODE);
|
||||
verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1,
|
||||
TIME_INCREMENT);
|
||||
verifyNoInteractions(edsResourceWatcher);
|
||||
assertThat(fakeWatchClock.getPendingTasks().size()).isEqualTo(1);
|
||||
|
||||
// Updated EDS response.
|
||||
Any updatedClusterLoadAssignment = Any.pack(mf.buildClusterLoadAssignment(EDS_RESOURCE,
|
||||
ImmutableList.of(mf.buildLocalityLbEndpoints("region2", "zone2", "subzone2",
|
||||
mf.buildLbEndpoint("172.44.2.2", 8000, "unknown", 3), 2, 0)),
|
||||
ImmutableList.<Message>of()));
|
||||
call.sendResponse(EDS, updatedClusterLoadAssignment, VERSION_2, "0001");
|
||||
// message not processed due to flow control
|
||||
call.verifyNoMoreRequest();
|
||||
assertThat(call.isReady()).isFalse();
|
||||
|
||||
CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
doAnswer(blockUpdate(barrier)).when(edsResourceWatcher).onChanged(any(EdsUpdate.class));
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
new Thread(() -> {
|
||||
try {
|
||||
fakeWatchClock.runDueTasks();
|
||||
latch.countDown();
|
||||
} catch (Exception ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}).start();
|
||||
|
||||
verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1,
|
||||
TIME_INCREMENT);
|
||||
barrier.await();
|
||||
verify(edsResourceWatcher, atLeastOnce()).onChanged(edsUpdateCaptor.capture());
|
||||
EdsUpdate edsUpdate = edsUpdateCaptor.getAllValues().get(0);
|
||||
validateGoldenClusterLoadAssignment(edsUpdate);
|
||||
barrier.await();
|
||||
latch.await(10, TimeUnit.SECONDS);
|
||||
verify(edsResourceWatcher, times(2)).onChanged(any());
|
||||
verifyResourceMetadataAcked(EDS, EDS_RESOURCE, updatedClusterLoadAssignment, VERSION_2,
|
||||
TIME_INCREMENT * 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void edsResourceUpdated() {
|
||||
DiscoveryRpcCall call = startResourceWatcher(XdsEndpointResource.getInstance(), EDS_RESOURCE,
|
||||
|
@ -3768,29 +3641,31 @@ public abstract class XdsClientImplTestBase {
|
|||
}
|
||||
|
||||
private <T extends ResourceUpdate> DiscoveryRpcCall startResourceWatcher(
|
||||
XdsResourceType<T> type, String name, ResourceWatcher<T> watcher, Executor executor) {
|
||||
xdsClient.watchXdsResource(type, name, watcher, executor);
|
||||
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
|
||||
assertThat(call).isNotNull();
|
||||
call.verifyRequest(type, Collections.singletonList(name), "", "", NODE);
|
||||
|
||||
XdsResourceType<T> type, String name, ResourceWatcher<T> watcher) {
|
||||
FakeClock.TaskFilter timeoutTaskFilter;
|
||||
switch (type.typeName()) {
|
||||
case "LDS":
|
||||
timeoutTaskFilter = LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER;
|
||||
xdsClient.watchXdsResource(type, name, watcher);
|
||||
break;
|
||||
case "RDS":
|
||||
timeoutTaskFilter = RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER;
|
||||
xdsClient.watchXdsResource(type, name, watcher);
|
||||
break;
|
||||
case "CDS":
|
||||
timeoutTaskFilter = CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER;
|
||||
xdsClient.watchXdsResource(type, name, watcher);
|
||||
break;
|
||||
case "EDS":
|
||||
timeoutTaskFilter = EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER;
|
||||
xdsClient.watchXdsResource(type, name, watcher);
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("should never be here");
|
||||
}
|
||||
|
||||
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
|
||||
call.verifyRequest(type, Collections.singletonList(name), "", "", NODE);
|
||||
ScheduledTask timeoutTask =
|
||||
Iterables.getOnlyElement(fakeClock.getPendingTasks(timeoutTaskFilter));
|
||||
assertThat(timeoutTask.getDelay(TimeUnit.SECONDS))
|
||||
|
@ -3798,11 +3673,6 @@ public abstract class XdsClientImplTestBase {
|
|||
return call;
|
||||
}
|
||||
|
||||
private <T extends ResourceUpdate> DiscoveryRpcCall startResourceWatcher(
|
||||
XdsResourceType<T> type, String name, ResourceWatcher<T> watcher) {
|
||||
return startResourceWatcher(type, name, watcher, MoreExecutors.directExecutor());
|
||||
}
|
||||
|
||||
protected abstract static class DiscoveryRpcCall {
|
||||
|
||||
protected void verifyRequest(
|
||||
|
@ -3849,10 +3719,6 @@ public abstract class XdsClientImplTestBase {
|
|||
protected void sendCompleted() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
protected boolean isReady() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract static class LrsRpcCall {
|
||||
|
|
|
@ -98,7 +98,6 @@ import io.grpc.BindableService;
|
|||
import io.grpc.Context;
|
||||
import io.grpc.Context.CancellationListener;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.stub.ServerCallStreamObserver;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -248,11 +247,6 @@ public class XdsClientImplV3Test extends XdsClientImplTestBase {
|
|||
protected void sendCompleted() {
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isReady() {
|
||||
return ((ServerCallStreamObserver)responseObserver).isReady();
|
||||
}
|
||||
}
|
||||
|
||||
private static class LrsRpcCallV3 extends LrsRpcCall {
|
||||
|
|
|
@ -96,7 +96,6 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -1916,9 +1915,8 @@ public class XdsNameResolverTest {
|
|||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType,
|
||||
String resourceName,
|
||||
ResourceWatcher<T> watcher,
|
||||
Executor syncContext) {
|
||||
String resourceName,
|
||||
ResourceWatcher<T> watcher) {
|
||||
|
||||
switch (resourceType.typeName()) {
|
||||
case "LDS":
|
||||
|
@ -1961,10 +1959,8 @@ public class XdsNameResolverTest {
|
|||
}
|
||||
|
||||
void deliverLdsUpdate(long httpMaxStreamDurationNano, List<VirtualHost> virtualHosts) {
|
||||
syncContext.execute(() -> {
|
||||
ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts(
|
||||
httpMaxStreamDurationNano, virtualHosts, null)));
|
||||
});
|
||||
ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts(
|
||||
httpMaxStreamDurationNano, virtualHosts, null)));
|
||||
}
|
||||
|
||||
void deliverLdsUpdate(final List<Route> routes) {
|
||||
|
@ -1972,10 +1968,8 @@ public class XdsNameResolverTest {
|
|||
VirtualHost.create(
|
||||
"virtual-host", Collections.singletonList(expectedLdsResourceName), routes,
|
||||
ImmutableMap.of());
|
||||
syncContext.execute(() -> {
|
||||
ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts(
|
||||
0L, Collections.singletonList(virtualHost), null)));
|
||||
});
|
||||
ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts(
|
||||
0L, Collections.singletonList(virtualHost), null)));
|
||||
}
|
||||
|
||||
void deliverLdsUpdateWithFaultInjection(
|
||||
|
@ -2019,10 +2013,8 @@ public class XdsNameResolverTest {
|
|||
Collections.singletonList(expectedLdsResourceName),
|
||||
Collections.singletonList(route),
|
||||
overrideConfig);
|
||||
syncContext.execute(() -> {
|
||||
ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts(
|
||||
0L, Collections.singletonList(virtualHost), filterChain)));
|
||||
});
|
||||
ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts(
|
||||
0L, Collections.singletonList(virtualHost), filterChain)));
|
||||
}
|
||||
|
||||
void deliverLdsUpdateForRdsNameWithFaultInjection(
|
||||
|
@ -2034,23 +2026,17 @@ public class XdsNameResolverTest {
|
|||
ImmutableList<NamedFilterConfig> filterChain = ImmutableList.of(
|
||||
new NamedFilterConfig(FAULT_FILTER_INSTANCE_NAME, httpFilterFaultConfig),
|
||||
new NamedFilterConfig(ROUTER_FILTER_INSTANCE_NAME, RouterFilter.ROUTER_CONFIG));
|
||||
syncContext.execute(() -> {
|
||||
ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forRdsName(
|
||||
0L, rdsName, filterChain)));
|
||||
});
|
||||
ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forRdsName(
|
||||
0L, rdsName, filterChain)));
|
||||
}
|
||||
|
||||
void deliverLdsUpdateForRdsName(String rdsName) {
|
||||
syncContext.execute(() -> {
|
||||
ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forRdsName(
|
||||
0, rdsName, null)));
|
||||
});
|
||||
ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forRdsName(
|
||||
0, rdsName, null)));
|
||||
}
|
||||
|
||||
void deliverLdsResourceNotFound() {
|
||||
syncContext.execute(() -> {
|
||||
ldsWatcher.onResourceDoesNotExist(expectedLdsResourceName);
|
||||
});
|
||||
ldsWatcher.onResourceDoesNotExist(expectedLdsResourceName);
|
||||
}
|
||||
|
||||
void deliverRdsUpdateWithFaultInjection(
|
||||
|
@ -2086,39 +2072,29 @@ public class XdsNameResolverTest {
|
|||
Collections.singletonList(expectedLdsResourceName),
|
||||
Collections.singletonList(route),
|
||||
overrideConfig);
|
||||
syncContext.execute(() -> {
|
||||
rdsWatcher.onChanged(new RdsUpdate(Collections.singletonList(virtualHost)));
|
||||
});
|
||||
rdsWatcher.onChanged(new RdsUpdate(Collections.singletonList(virtualHost)));
|
||||
}
|
||||
|
||||
void deliverRdsUpdate(String resourceName, List<VirtualHost> virtualHosts) {
|
||||
if (!resourceName.equals(rdsResource)) {
|
||||
return;
|
||||
}
|
||||
syncContext.execute(() -> {
|
||||
rdsWatcher.onChanged(new RdsUpdate(virtualHosts));
|
||||
});
|
||||
rdsWatcher.onChanged(new RdsUpdate(virtualHosts));
|
||||
}
|
||||
|
||||
void deliverRdsResourceNotFound(String resourceName) {
|
||||
if (!resourceName.equals(rdsResource)) {
|
||||
return;
|
||||
}
|
||||
syncContext.execute(() -> {
|
||||
rdsWatcher.onResourceDoesNotExist(rdsResource);
|
||||
});
|
||||
rdsWatcher.onResourceDoesNotExist(rdsResource);
|
||||
}
|
||||
|
||||
void deliverError(final Status error) {
|
||||
if (ldsWatcher != null) {
|
||||
syncContext.execute(() -> {
|
||||
ldsWatcher.onError(error);
|
||||
});
|
||||
ldsWatcher.onError(error);
|
||||
}
|
||||
if (rdsWatcher != null) {
|
||||
syncContext.execute(() -> {
|
||||
rdsWatcher.onError(error);
|
||||
});
|
||||
rdsWatcher.onError(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,7 +39,6 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
|
@ -183,8 +182,7 @@ public class XdsServerTestHelper {
|
|||
@SuppressWarnings("unchecked")
|
||||
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType,
|
||||
String resourceName,
|
||||
ResourceWatcher<T> watcher,
|
||||
Executor syncContext) {
|
||||
ResourceWatcher<T> watcher) {
|
||||
switch (resourceType.typeName()) {
|
||||
case "LDS":
|
||||
assertThat(ldsWatcher).isNull();
|
||||
|
|
|
@ -47,7 +47,6 @@ import io.grpc.ServerCallHandler;
|
|||
import io.grpc.ServerInterceptor;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusException;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.internal.FakeClock;
|
||||
import io.grpc.testing.TestMethodDescriptors;
|
||||
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
|
||||
|
@ -153,8 +152,7 @@ public class XdsServerWrapperTest {
|
|||
verify(xdsClient, timeout(5000)).watchXdsResource(
|
||||
eq(listenerResource),
|
||||
eq("grpc/server?udpa.resource.listening_address=[::FFFF:129.144.52.38]:80"),
|
||||
any(ResourceWatcher.class),
|
||||
any(SynchronizationContext.class));
|
||||
any(ResourceWatcher.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -226,8 +224,7 @@ public class XdsServerWrapperTest {
|
|||
eq(listenerResource),
|
||||
eq("xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/server/"
|
||||
+ "%5B::FFFF:129.144.52.38%5D:80"),
|
||||
any(ResourceWatcher.class),
|
||||
any(SynchronizationContext.class));
|
||||
any(ResourceWatcher.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue