xds: Plumb the Cluster's filterMetadata to RPCs

This will be used by CSM observability, and may get exposed to further
uses in the future.
This commit is contained in:
Eric Anderson 2024-05-24 15:08:36 -07:00 committed by GitHub
parent 018917ae59
commit 75fa441fc9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 124 additions and 60 deletions

View File

@ -176,13 +176,15 @@ final class CdsLoadBalancer2 extends LoadBalancer {
clusterState.result.lrsServerInfo(),
clusterState.result.maxConcurrentRequests(),
clusterState.result.upstreamTlsContext(),
clusterState.result.filterMetadata(),
clusterState.result.outlierDetection());
} else { // logical DNS
instance = DiscoveryMechanism.forLogicalDns(
clusterState.name, clusterState.result.dnsHostName(),
clusterState.result.lrsServerInfo(),
clusterState.result.maxConcurrentRequests(),
clusterState.result.upstreamTlsContext());
clusterState.result.upstreamTlsContext(),
clusterState.result.filterMetadata());
}
instances.add(instance);
}

View File

@ -21,6 +21,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Struct;
import io.grpc.Attributes;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
@ -54,6 +56,7 @@ import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
@ -140,6 +143,7 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
childLbHelper.updateDropPolicies(config.dropCategories);
childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests);
childLbHelper.updateSslContextProviderSupplier(config.tlsContext);
childLbHelper.updateFilterMetadata(config.filterMetadata);
childSwitchLb.switchTo(config.childPolicy.getProvider());
childSwitchLb.handleResolvedAddresses(
@ -189,6 +193,7 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
@Nullable
private SslContextProviderSupplier sslContextProviderSupplier;
private Map<String, Struct> filterMetadata = ImmutableMap.of();
@Nullable
private final ServerInfo lrsServerInfo;
@ -201,8 +206,8 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
currentState = newState;
currentPicker = newPicker;
SubchannelPicker picker =
new RequestLimitingSubchannelPicker(newPicker, dropPolicies, maxConcurrentRequests);
SubchannelPicker picker = new RequestLimitingSubchannelPicker(
newPicker, dropPolicies, maxConcurrentRequests, filterMetadata);
delegate().updateBalancingState(newState, picker);
}
@ -311,20 +316,29 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
: null;
}
private void updateFilterMetadata(Map<String, Struct> filterMetadata) {
this.filterMetadata = ImmutableMap.copyOf(filterMetadata);
}
private class RequestLimitingSubchannelPicker extends SubchannelPicker {
private final SubchannelPicker delegate;
private final List<DropOverload> dropPolicies;
private final long maxConcurrentRequests;
private final Map<String, Struct> filterMetadata;
private RequestLimitingSubchannelPicker(SubchannelPicker delegate,
List<DropOverload> dropPolicies, long maxConcurrentRequests) {
List<DropOverload> dropPolicies, long maxConcurrentRequests,
Map<String, Struct> filterMetadata) {
this.delegate = delegate;
this.dropPolicies = dropPolicies;
this.maxConcurrentRequests = maxConcurrentRequests;
this.filterMetadata = checkNotNull(filterMetadata, "filterMetadata");
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
args.getCallOptions().getOption(ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER)
.accept(filterMetadata);
for (DropOverload dropOverload : dropPolicies) {
int rand = random.nextInt(1_000_000);
if (rand < dropOverload.dropsPerMillion()) {

View File

@ -19,6 +19,9 @@ package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Struct;
import io.grpc.CallOptions;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
@ -34,6 +37,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.annotation.Nullable;
/**
@ -43,6 +47,11 @@ import javax.annotation.Nullable;
*/
@Internal
public final class ClusterImplLoadBalancerProvider extends LoadBalancerProvider {
/**
* Consumer of filter metadata from the cluster used by the call. Consumer may not modify map.
*/
public static final CallOptions.Key<Consumer<Map<String, Struct>>> FILTER_METADATA_CONSUMER =
CallOptions.Key.createWithDefault("io.grpc.xds.internalFilterMetadataConsumer", (m) -> { });
@Override
public boolean isAvailable() {
@ -89,16 +98,18 @@ public final class ClusterImplLoadBalancerProvider extends LoadBalancerProvider
final List<DropOverload> dropCategories;
// Provides the direct child policy and its config.
final PolicySelection childPolicy;
final Map<String, Struct> filterMetadata;
ClusterImplConfig(String cluster, @Nullable String edsServiceName,
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
List<DropOverload> dropCategories, PolicySelection childPolicy,
@Nullable UpstreamTlsContext tlsContext) {
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata) {
this.cluster = checkNotNull(cluster, "cluster");
this.edsServiceName = edsServiceName;
this.lrsServerInfo = lrsServerInfo;
this.maxConcurrentRequests = maxConcurrentRequests;
this.tlsContext = tlsContext;
this.filterMetadata = ImmutableMap.copyOf(filterMetadata);
this.dropCategories = Collections.unmodifiableList(
new ArrayList<>(checkNotNull(dropCategories, "dropCategories")));
this.childPolicy = checkNotNull(childPolicy, "childPolicy");

View File

@ -21,6 +21,8 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Struct;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalLogId;
@ -184,10 +186,11 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
if (instance.type == DiscoveryMechanism.Type.EDS) {
state = new EdsClusterState(instance.cluster, instance.edsServiceName,
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
instance.outlierDetection);
instance.filterMetadata, instance.outlierDetection);
} else { // logical DNS
state = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName,
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext);
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
instance.filterMetadata);
}
clusterStates.put(instance.cluster, state);
state.start();
@ -323,6 +326,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
protected final Long maxConcurrentRequests;
@Nullable
protected final UpstreamTlsContext tlsContext;
protected final Map<String, Struct> filterMetadata;
@Nullable
protected final OutlierDetection outlierDetection;
// Resolution status, may contain most recent error encountered.
@ -337,11 +341,12 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
private ClusterState(String name, @Nullable ServerInfo lrsServerInfo,
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
@Nullable OutlierDetection outlierDetection) {
Map<String, Struct> filterMetadata, @Nullable OutlierDetection outlierDetection) {
this.name = name;
this.lrsServerInfo = lrsServerInfo;
this.maxConcurrentRequests = maxConcurrentRequests;
this.tlsContext = tlsContext;
this.filterMetadata = ImmutableMap.copyOf(filterMetadata);
this.outlierDetection = outlierDetection;
}
@ -360,8 +365,10 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
private EdsClusterState(String name, @Nullable String edsServiceName,
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext tlsContext, @Nullable OutlierDetection outlierDetection) {
super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, outlierDetection);
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata,
@Nullable OutlierDetection outlierDetection) {
super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
outlierDetection);
this.edsServiceName = edsServiceName;
}
@ -447,8 +454,8 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
Map<String, PriorityChildConfig> priorityChildConfigs =
generateEdsBasedPriorityChildConfigs(
name, edsServiceName, lrsServerInfo, maxConcurrentRequests, tlsContext,
outlierDetection, endpointLbPolicy, lbRegistry, prioritizedLocalityWeights,
dropOverloads);
filterMetadata, outlierDetection, endpointLbPolicy, lbRegistry,
prioritizedLocalityWeights, dropOverloads);
status = Status.OK;
resolved = true;
result = new ClusterResolutionResult(addresses, priorityChildConfigs,
@ -533,8 +540,8 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
private LogicalDnsClusterState(String name, String dnsHostName,
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext tlsContext) {
super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, null);
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata) {
super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null);
this.dnsHostName = checkNotNull(dnsHostName, "dnsHostName");
nameResolverFactory =
checkNotNull(helper.getNameResolverRegistry().asFactory(), "nameResolverFactory");
@ -623,8 +630,8 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
addresses.add(eag);
}
PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
name, lrsServerInfo, maxConcurrentRequests, tlsContext, lbRegistry,
Collections.<DropOverload>emptyList());
name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
lbRegistry, Collections.<DropOverload>emptyList());
status = Status.OK;
resolved = true;
result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
@ -707,14 +714,14 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
*/
private static PriorityChildConfig generateDnsBasedPriorityChildConfig(
String cluster, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext tlsContext, LoadBalancerRegistry lbRegistry,
List<DropOverload> dropOverloads) {
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata,
LoadBalancerRegistry lbRegistry, List<DropOverload> dropOverloads) {
// Override endpoint-level LB policy with pick_first for logical DNS cluster.
PolicySelection endpointLbPolicy =
new PolicySelection(lbRegistry.getProvider("pick_first"), null);
ClusterImplConfig clusterImplConfig =
new ClusterImplConfig(cluster, null, lrsServerInfo, maxConcurrentRequests,
dropOverloads, endpointLbPolicy, tlsContext);
dropOverloads, endpointLbPolicy, tlsContext, filterMetadata);
LoadBalancerProvider clusterImplLbProvider =
lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
PolicySelection clusterImplPolicy =
@ -731,6 +738,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
private static Map<String, PriorityChildConfig> generateEdsBasedPriorityChildConfigs(
String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo,
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
Map<String, Struct> filterMetadata,
@Nullable OutlierDetection outlierDetection, PolicySelection endpointLbPolicy,
LoadBalancerRegistry lbRegistry, Map<String,
Map<Locality, Integer>> prioritizedLocalityWeights, List<DropOverload> dropOverloads) {
@ -738,7 +746,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
for (String priority : prioritizedLocalityWeights.keySet()) {
ClusterImplConfig clusterImplConfig =
new ClusterImplConfig(cluster, edsServiceName, lrsServerInfo, maxConcurrentRequests,
dropOverloads, endpointLbPolicy, tlsContext);
dropOverloads, endpointLbPolicy, tlsContext, filterMetadata);
LoadBalancerProvider clusterImplLbProvider =
lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
PolicySelection priorityChildPolicy =

View File

@ -19,6 +19,8 @@ package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Struct;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
@ -129,6 +131,7 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi
final String dnsHostName;
@Nullable
final OutlierDetection outlierDetection;
final Map<String, Struct> filterMetadata;
enum Type {
EDS,
@ -138,7 +141,7 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi
private DiscoveryMechanism(String cluster, Type type, @Nullable String edsServiceName,
@Nullable String dnsHostName, @Nullable ServerInfo lrsServerInfo,
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
@Nullable OutlierDetection outlierDetection) {
Map<String, Struct> filterMetadata, @Nullable OutlierDetection outlierDetection) {
this.cluster = checkNotNull(cluster, "cluster");
this.type = checkNotNull(type, "type");
this.edsServiceName = edsServiceName;
@ -146,28 +149,29 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi
this.lrsServerInfo = lrsServerInfo;
this.maxConcurrentRequests = maxConcurrentRequests;
this.tlsContext = tlsContext;
this.filterMetadata = ImmutableMap.copyOf(checkNotNull(filterMetadata, "filterMetadata"));
this.outlierDetection = outlierDetection;
}
static DiscoveryMechanism forEds(String cluster, @Nullable String edsServiceName,
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext tlsContext,
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata,
OutlierDetection outlierDetection) {
return new DiscoveryMechanism(cluster, Type.EDS, edsServiceName, null, lrsServerInfo,
maxConcurrentRequests, tlsContext, outlierDetection);
maxConcurrentRequests, tlsContext, filterMetadata, outlierDetection);
}
static DiscoveryMechanism forLogicalDns(String cluster, String dnsHostName,
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext tlsContext) {
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata) {
return new DiscoveryMechanism(cluster, Type.LOGICAL_DNS, null, dnsHostName,
lrsServerInfo, maxConcurrentRequests, tlsContext, null);
lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null);
}
@Override
public int hashCode() {
return Objects.hash(cluster, type, lrsServerInfo, maxConcurrentRequests, tlsContext,
edsServiceName, dnsHostName);
edsServiceName, dnsHostName, filterMetadata, outlierDetection);
}
@Override
@ -185,7 +189,9 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi
&& Objects.equals(dnsHostName, that.dnsHostName)
&& Objects.equals(lrsServerInfo, that.lrsServerInfo)
&& Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests)
&& Objects.equals(tlsContext, that.tlsContext);
&& Objects.equals(tlsContext, that.tlsContext)
&& Objects.equals(filterMetadata, that.filterMetadata)
&& Objects.equals(outlierDetection, that.outlierDetection);
}
@Override
@ -198,7 +204,10 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi
.add("dnsHostName", dnsHostName)
.add("lrsServerInfo", lrsServerInfo)
// Exclude tlsContext as its string representation is cumbersome.
.add("maxConcurrentRequests", maxConcurrentRequests);
.add("maxConcurrentRequests", maxConcurrentRequests)
.add("filterMetadata", filterMetadata)
// Exclude outlierDetection as its string representation is long.
;
return toStringHelper.toString();
}
}

View File

@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Duration;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.Struct;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
@ -160,6 +161,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
}
updateBuilder.lbPolicyConfig(lbPolicyConfig);
updateBuilder.filterMetadata(
ImmutableMap.copyOf(cluster.getMetadata().getFilterMetadataMap()));
return updateBuilder.build();
}
@ -559,14 +562,21 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
@Nullable
abstract OutlierDetection outlierDetection();
static Builder forAggregate(String clusterName, List<String> prioritizedClusterNames) {
checkNotNull(prioritizedClusterNames, "prioritizedClusterNames");
abstract ImmutableMap<String, Struct> filterMetadata();
private static Builder newBuilder(String clusterName) {
return new AutoValue_XdsClusterResource_CdsUpdate.Builder()
.clusterName(clusterName)
.clusterType(ClusterType.AGGREGATE)
.minRingSize(0)
.maxRingSize(0)
.choiceCount(0)
.filterMetadata(ImmutableMap.of());
}
static Builder forAggregate(String clusterName, List<String> prioritizedClusterNames) {
checkNotNull(prioritizedClusterNames, "prioritizedClusterNames");
return newBuilder(clusterName)
.clusterType(ClusterType.AGGREGATE)
.prioritizedClusterNames(ImmutableList.copyOf(prioritizedClusterNames));
}
@ -574,12 +584,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext upstreamTlsContext,
@Nullable OutlierDetection outlierDetection) {
return new AutoValue_XdsClusterResource_CdsUpdate.Builder()
.clusterName(clusterName)
return newBuilder(clusterName)
.clusterType(ClusterType.EDS)
.minRingSize(0)
.maxRingSize(0)
.choiceCount(0)
.edsServiceName(edsServiceName)
.lrsServerInfo(lrsServerInfo)
.maxConcurrentRequests(maxConcurrentRequests)
@ -591,12 +597,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
@Nullable ServerInfo lrsServerInfo,
@Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext upstreamTlsContext) {
return new AutoValue_XdsClusterResource_CdsUpdate.Builder()
.clusterName(clusterName)
return newBuilder(clusterName)
.clusterType(ClusterType.LOGICAL_DNS)
.minRingSize(0)
.maxRingSize(0)
.choiceCount(0)
.dnsHostName(dnsHostName)
.lrsServerInfo(lrsServerInfo)
.maxConcurrentRequests(maxConcurrentRequests)
@ -685,6 +687,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
protected abstract Builder outlierDetection(OutlierDetection outlierDetection);
protected abstract Builder filterMetadata(ImmutableMap<String, Struct> filterMetadata);
abstract CdsUpdate build();
}
}

View File

@ -177,7 +177,8 @@ public class ClusterImplLoadBalancerTest {
Object weightedTargetConfig = new Object();
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
null, Collections.<DropOverload>emptyList(),
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null);
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
Collections.emptyMap());
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(downstreamBalancers);
@ -202,7 +203,8 @@ public class ClusterImplLoadBalancerTest {
ClusterImplConfig configWithWeightedTarget = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME,
LRS_SERVER_INFO,
null, Collections.<DropOverload>emptyList(),
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null);
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
Collections.emptyMap());
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
deliverAddressesAndConfig(Collections.singletonList(endpoint), configWithWeightedTarget);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(downstreamBalancers);
@ -215,7 +217,8 @@ public class ClusterImplLoadBalancerTest {
ClusterImplConfig configWithWrrLocality = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME,
LRS_SERVER_INFO,
null, Collections.<DropOverload>emptyList(),
new PolicySelection(wrrLocalityProvider, wrrLocalityConfig), null);
new PolicySelection(wrrLocalityProvider, wrrLocalityConfig), null,
Collections.emptyMap());
deliverAddressesAndConfig(Collections.singletonList(endpoint), configWithWrrLocality);
childBalancer = Iterables.getOnlyElement(downstreamBalancers);
assertThat(childBalancer.name).isEqualTo(XdsLbPolicies.WRR_LOCALITY_POLICY_NAME);
@ -239,7 +242,8 @@ public class ClusterImplLoadBalancerTest {
Object weightedTargetConfig = new Object();
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
null, Collections.<DropOverload>emptyList(),
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null);
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
Collections.emptyMap());
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(downstreamBalancers);
@ -258,7 +262,8 @@ public class ClusterImplLoadBalancerTest {
buildWeightedTargetConfig(ImmutableMap.of(locality, 10));
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
null, Collections.<DropOverload>emptyList(),
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null);
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
Collections.emptyMap());
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers);
@ -284,7 +289,8 @@ public class ClusterImplLoadBalancerTest {
buildWeightedTargetConfig(ImmutableMap.of(locality, 10));
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
null, Collections.<DropOverload>emptyList(),
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null);
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
Collections.emptyMap());
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers);
@ -368,7 +374,8 @@ public class ClusterImplLoadBalancerTest {
buildWeightedTargetConfig(ImmutableMap.of(locality, 10));
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
null, Collections.singletonList(DropOverload.create("throttle", 500_000)),
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null);
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
Collections.emptyMap());
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
when(mockRandom.nextInt(anyInt())).thenReturn(499_999, 999_999, 1_000_000);
@ -397,7 +404,8 @@ public class ClusterImplLoadBalancerTest {
// Config update updates drop policies.
config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, null,
Collections.singletonList(DropOverload.create("lb", 1_000_000)),
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null);
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
Collections.emptyMap());
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Collections.singletonList(endpoint))
@ -444,7 +452,8 @@ public class ClusterImplLoadBalancerTest {
buildWeightedTargetConfig(ImmutableMap.of(locality, 10));
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
maxConcurrentRequests, Collections.<DropOverload>emptyList(),
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null);
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
Collections.emptyMap());
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
assertThat(downstreamBalancers).hasSize(1); // one leaf balancer
@ -486,7 +495,8 @@ public class ClusterImplLoadBalancerTest {
maxConcurrentRequests = 101L;
config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
maxConcurrentRequests, Collections.<DropOverload>emptyList(),
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null);
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
Collections.emptyMap());
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
result = currentPicker.pickSubchannel(pickSubchannelArgs);
@ -532,7 +542,8 @@ public class ClusterImplLoadBalancerTest {
buildWeightedTargetConfig(ImmutableMap.of(locality, 10));
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
null, Collections.<DropOverload>emptyList(),
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null);
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
Collections.emptyMap());
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
assertThat(downstreamBalancers).hasSize(1); // one leaf balancer
@ -578,7 +589,8 @@ public class ClusterImplLoadBalancerTest {
buildWeightedTargetConfig(ImmutableMap.of(locality, 10));
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
null, Collections.<DropOverload>emptyList(),
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null);
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
Collections.emptyMap());
// One locality with two endpoints.
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr1", locality);
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr2", locality);
@ -615,7 +627,8 @@ public class ClusterImplLoadBalancerTest {
buildWeightedTargetConfig(ImmutableMap.of(locality, 10));
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
null, Collections.<DropOverload>emptyList(),
new PolicySelection(weightedTargetProvider, weightedTargetConfig), upstreamTlsContext);
new PolicySelection(weightedTargetProvider, weightedTargetConfig), upstreamTlsContext,
Collections.emptyMap());
// One locality with two endpoints.
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr1", locality);
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr2", locality);
@ -638,7 +651,8 @@ public class ClusterImplLoadBalancerTest {
// Removes UpstreamTlsContext from the config.
config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
null, Collections.<DropOverload>emptyList(),
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null);
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
Collections.emptyMap());
deliverAddressesAndConfig(Arrays.asList(endpoint1, endpoint2), config);
assertThat(Iterables.getOnlyElement(downstreamBalancers)).isSameInstanceAs(leafBalancer);
subchannel = leafBalancer.helper.createSubchannel(args); // creates new connections
@ -652,7 +666,8 @@ public class ClusterImplLoadBalancerTest {
CommonTlsContextTestsUtil.buildUpstreamTlsContext("google_cloud_private_spiffe1", true);
config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
null, Collections.<DropOverload>emptyList(),
new PolicySelection(weightedTargetProvider, weightedTargetConfig), upstreamTlsContext);
new PolicySelection(weightedTargetProvider, weightedTargetConfig), upstreamTlsContext,
Collections.emptyMap());
deliverAddressesAndConfig(Arrays.asList(endpoint1, endpoint2), config);
assertThat(Iterables.getOnlyElement(downstreamBalancers)).isSameInstanceAs(leafBalancer);
subchannel = leafBalancer.helper.createSubchannel(args); // creates new connections

View File

@ -136,15 +136,16 @@ public class ClusterResolverLoadBalancerTest {
FailurePercentageEjection.create(100, 100, 100, 100));
private final DiscoveryMechanism edsDiscoveryMechanism1 =
DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L, tlsContext,
null);
Collections.emptyMap(), null);
private final DiscoveryMechanism edsDiscoveryMechanism2 =
DiscoveryMechanism.forEds(CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_INFO, 200L, tlsContext,
null);
Collections.emptyMap(), null);
private final DiscoveryMechanism edsDiscoveryMechanismWithOutlierDetection =
DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L, tlsContext,
outlierDetection);
Collections.emptyMap(), outlierDetection);
private final DiscoveryMechanism logicalDnsDiscoveryMechanism =
DiscoveryMechanism.forLogicalDns(CLUSTER_DNS, DNS_HOST_NAME, LRS_SERVER_INFO, 300L, null);
DiscoveryMechanism.forLogicalDns(CLUSTER_DNS, DNS_HOST_NAME, LRS_SERVER_INFO, 300L, null,
Collections.emptyMap());
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {