mirror of https://github.com/grpc/grpc-java.git
xds: Avoid switchTo in ClusterImplLb and ClusterResolverLb
This commit is contained in:
parent
749b2e0abc
commit
dfb22ba97c
|
@ -23,15 +23,12 @@ import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.grpc.InternalLogId;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.LoadBalancerRegistry;
|
||||
import io.grpc.NameResolver;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.internal.ServiceConfigUtil;
|
||||
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
||||
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
|
||||
import io.grpc.util.GracefulSwitchLoadBalancer;
|
||||
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
|
||||
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
|
||||
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
|
||||
|
@ -43,6 +40,7 @@ import io.grpc.xds.client.XdsLogger;
|
|||
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -234,26 +232,17 @@ final class CdsLoadBalancer2 extends LoadBalancer {
|
|||
return;
|
||||
}
|
||||
|
||||
// The LB policy config is provided in service_config.proto/JSON format. It is unwrapped
|
||||
// to determine the name of the policy in the load balancer registry.
|
||||
LbConfig unwrappedLbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
root.result.lbPolicyConfig());
|
||||
LoadBalancerProvider lbProvider = lbRegistry.getProvider(unwrappedLbConfig.getPolicyName());
|
||||
if (lbProvider == null) {
|
||||
throw NameResolver.ConfigOrError.fromError(Status.UNAVAILABLE.withDescription(
|
||||
"No provider available for LB: " + unwrappedLbConfig.getPolicyName())).getError()
|
||||
.asRuntimeException();
|
||||
}
|
||||
NameResolver.ConfigOrError configOrError = lbProvider.parseLoadBalancingPolicyConfig(
|
||||
unwrappedLbConfig.getRawConfigValue());
|
||||
// The LB policy config is provided in service_config.proto/JSON format.
|
||||
NameResolver.ConfigOrError configOrError =
|
||||
GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
|
||||
Arrays.asList(root.result.lbPolicyConfig()), lbRegistry);
|
||||
if (configOrError.getError() != null) {
|
||||
throw configOrError.getError().augmentDescription("Unable to parse the LB config")
|
||||
.asRuntimeException();
|
||||
}
|
||||
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.unmodifiableList(instances),
|
||||
new PolicySelection(lbProvider, configOrError.getConfig()));
|
||||
Collections.unmodifiableList(instances), configOrError.getConfig());
|
||||
if (childLb == null) {
|
||||
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
|
||||
}
|
||||
|
|
|
@ -145,11 +145,10 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
|
|||
childLbHelper.updateSslContextProviderSupplier(config.tlsContext);
|
||||
childLbHelper.updateFilterMetadata(config.filterMetadata);
|
||||
|
||||
childSwitchLb.switchTo(config.childPolicy.getProvider());
|
||||
childSwitchLb.handleResolvedAddresses(
|
||||
resolvedAddresses.toBuilder()
|
||||
.setAttributes(attributes)
|
||||
.setLoadBalancingPolicyConfig(config.childPolicy.getConfig())
|
||||
.setLoadBalancingPolicyConfig(config.childConfig)
|
||||
.build());
|
||||
return Status.OK;
|
||||
}
|
||||
|
|
|
@ -29,7 +29,6 @@ import io.grpc.LoadBalancerProvider;
|
|||
import io.grpc.LoadBalancerRegistry;
|
||||
import io.grpc.NameResolver.ConfigOrError;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
|
||||
import io.grpc.xds.Endpoints.DropOverload;
|
||||
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
|
||||
import io.grpc.xds.client.Bootstrapper.ServerInfo;
|
||||
|
@ -97,12 +96,12 @@ public final class ClusterImplLoadBalancerProvider extends LoadBalancerProvider
|
|||
// Drop configurations.
|
||||
final List<DropOverload> dropCategories;
|
||||
// Provides the direct child policy and its config.
|
||||
final PolicySelection childPolicy;
|
||||
final Object childConfig;
|
||||
final Map<String, Struct> filterMetadata;
|
||||
|
||||
ClusterImplConfig(String cluster, @Nullable String edsServiceName,
|
||||
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
|
||||
List<DropOverload> dropCategories, PolicySelection childPolicy,
|
||||
List<DropOverload> dropCategories, Object childConfig,
|
||||
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata) {
|
||||
this.cluster = checkNotNull(cluster, "cluster");
|
||||
this.edsServiceName = edsServiceName;
|
||||
|
@ -112,7 +111,7 @@ public final class ClusterImplLoadBalancerProvider extends LoadBalancerProvider
|
|||
this.filterMetadata = ImmutableMap.copyOf(filterMetadata);
|
||||
this.dropCategories = Collections.unmodifiableList(
|
||||
new ArrayList<>(checkNotNull(dropCategories, "dropCategories")));
|
||||
this.childPolicy = checkNotNull(childPolicy, "childPolicy");
|
||||
this.childConfig = checkNotNull(childConfig, "childConfig");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -124,7 +123,7 @@ public final class ClusterImplLoadBalancerProvider extends LoadBalancerProvider
|
|||
.add("maxConcurrentRequests", maxConcurrentRequests)
|
||||
// Exclude tlsContext as its string representation is cumbersome.
|
||||
.add("dropCategories", dropCategories)
|
||||
.add("childPolicy", childPolicy)
|
||||
.add("childConfig", childConfig)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -127,9 +127,11 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
(ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||
if (!Objects.equals(this.config, config)) {
|
||||
logger.log(XdsLogLevel.DEBUG, "Config: {0}", config);
|
||||
delegate.switchTo(new ClusterResolverLbStateFactory());
|
||||
this.config = config;
|
||||
delegate.handleResolvedAddresses(resolvedAddresses);
|
||||
Object gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
new ClusterResolverLbStateFactory(), config);
|
||||
delegate.handleResolvedAddresses(
|
||||
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(gracefulConfig).build());
|
||||
}
|
||||
return Status.OK;
|
||||
}
|
||||
|
@ -165,7 +167,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
private final Helper helper;
|
||||
private final List<String> clusters = new ArrayList<>();
|
||||
private final Map<String, ClusterState> clusterStates = new HashMap<>();
|
||||
private PolicySelection endpointLbPolicy;
|
||||
private Object endpointLbConfig;
|
||||
private ResolvedAddresses resolvedAddresses;
|
||||
private LoadBalancer childLb;
|
||||
|
||||
|
@ -179,7 +181,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
this.resolvedAddresses = resolvedAddresses;
|
||||
ClusterResolverConfig config =
|
||||
(ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||
endpointLbPolicy = config.lbPolicy;
|
||||
endpointLbConfig = config.lbConfig;
|
||||
for (DiscoveryMechanism instance : config.discoveryMechanisms) {
|
||||
clusters.add(instance.cluster);
|
||||
ClusterState state;
|
||||
|
@ -454,7 +456,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
Map<String, PriorityChildConfig> priorityChildConfigs =
|
||||
generateEdsBasedPriorityChildConfigs(
|
||||
name, edsServiceName, lrsServerInfo, maxConcurrentRequests, tlsContext,
|
||||
filterMetadata, outlierDetection, endpointLbPolicy, lbRegistry,
|
||||
filterMetadata, outlierDetection, endpointLbConfig, lbRegistry,
|
||||
prioritizedLocalityWeights, dropOverloads);
|
||||
status = Status.OK;
|
||||
resolved = true;
|
||||
|
@ -717,11 +719,11 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata,
|
||||
LoadBalancerRegistry lbRegistry, List<DropOverload> dropOverloads) {
|
||||
// Override endpoint-level LB policy with pick_first for logical DNS cluster.
|
||||
PolicySelection endpointLbPolicy =
|
||||
new PolicySelection(lbRegistry.getProvider("pick_first"), null);
|
||||
Object endpointLbConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
lbRegistry.getProvider("pick_first"), null);
|
||||
ClusterImplConfig clusterImplConfig =
|
||||
new ClusterImplConfig(cluster, null, lrsServerInfo, maxConcurrentRequests,
|
||||
dropOverloads, endpointLbPolicy, tlsContext, filterMetadata);
|
||||
dropOverloads, endpointLbConfig, tlsContext, filterMetadata);
|
||||
LoadBalancerProvider clusterImplLbProvider =
|
||||
lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
|
||||
PolicySelection clusterImplPolicy =
|
||||
|
@ -739,14 +741,14 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo,
|
||||
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
|
||||
Map<String, Struct> filterMetadata,
|
||||
@Nullable OutlierDetection outlierDetection, PolicySelection endpointLbPolicy,
|
||||
@Nullable OutlierDetection outlierDetection, Object endpointLbConfig,
|
||||
LoadBalancerRegistry lbRegistry, Map<String,
|
||||
Map<Locality, Integer>> prioritizedLocalityWeights, List<DropOverload> dropOverloads) {
|
||||
Map<String, PriorityChildConfig> configs = new HashMap<>();
|
||||
for (String priority : prioritizedLocalityWeights.keySet()) {
|
||||
ClusterImplConfig clusterImplConfig =
|
||||
new ClusterImplConfig(cluster, edsServiceName, lrsServerInfo, maxConcurrentRequests,
|
||||
dropOverloads, endpointLbPolicy, tlsContext, filterMetadata);
|
||||
dropOverloads, endpointLbConfig, tlsContext, filterMetadata);
|
||||
LoadBalancerProvider clusterImplLbProvider =
|
||||
lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
|
||||
PolicySelection priorityChildPolicy =
|
||||
|
|
|
@ -27,7 +27,6 @@ import io.grpc.LoadBalancer.Helper;
|
|||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.NameResolver.ConfigOrError;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
|
||||
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
|
||||
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
|
||||
import io.grpc.xds.client.Bootstrapper.ServerInfo;
|
||||
|
@ -73,18 +72,17 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi
|
|||
static final class ClusterResolverConfig {
|
||||
// Ordered list of clusters to be resolved.
|
||||
final List<DiscoveryMechanism> discoveryMechanisms;
|
||||
// Endpoint-level load balancing policy with config
|
||||
// (round_robin, least_request_experimental or ring_hash_experimental).
|
||||
final PolicySelection lbPolicy;
|
||||
// GracefulSwitch configuration
|
||||
final Object lbConfig;
|
||||
|
||||
ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, PolicySelection lbPolicy) {
|
||||
ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, Object lbConfig) {
|
||||
this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms");
|
||||
this.lbPolicy = checkNotNull(lbPolicy, "lbPolicy");
|
||||
this.lbConfig = checkNotNull(lbConfig, "lbConfig");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(discoveryMechanisms, lbPolicy);
|
||||
return Objects.hash(discoveryMechanisms, lbConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -97,14 +95,14 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi
|
|||
}
|
||||
ClusterResolverConfig that = (ClusterResolverConfig) o;
|
||||
return discoveryMechanisms.equals(that.discoveryMechanisms)
|
||||
&& lbPolicy.equals(that.lbPolicy);
|
||||
&& lbConfig.equals(that.lbConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("discoveryMechanisms", discoveryMechanisms)
|
||||
.add("lbPolicy", lbPolicy)
|
||||
.add("lbConfig", lbConfig)
|
||||
.toString();
|
||||
}
|
||||
|
||||
|
|
|
@ -48,6 +48,7 @@ import io.grpc.Status;
|
|||
import io.grpc.Status.Code;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.util.GracefulSwitchLoadBalancerAccessor;
|
||||
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
|
||||
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
|
||||
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
|
||||
|
@ -177,7 +178,9 @@ public class CdsLoadBalancer2Test {
|
|||
DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
|
||||
assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME,
|
||||
null, LRS_SERVER_INFO, 100L, upstreamTlsContext, outlierDetection);
|
||||
assertThat(childLbConfig.lbPolicy.getProvider().getPolicyName()).isEqualTo("round_robin");
|
||||
assertThat(
|
||||
GracefulSwitchLoadBalancerAccessor.getChildProvider(childLbConfig.lbConfig).getPolicyName())
|
||||
.isEqualTo("round_robin");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -194,9 +197,12 @@ public class CdsLoadBalancer2Test {
|
|||
DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
|
||||
assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.LOGICAL_DNS, null,
|
||||
DNS_HOST_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext, null);
|
||||
assertThat(childLbConfig.lbPolicy.getProvider().getPolicyName())
|
||||
assertThat(
|
||||
GracefulSwitchLoadBalancerAccessor.getChildProvider(childLbConfig.lbConfig).getPolicyName())
|
||||
.isEqualTo("least_request_experimental");
|
||||
assertThat(((LeastRequestConfig) childLbConfig.lbPolicy.getConfig()).choiceCount).isEqualTo(3);
|
||||
LeastRequestConfig lrConfig = (LeastRequestConfig)
|
||||
GracefulSwitchLoadBalancerAccessor.getChildConfig(childLbConfig.lbConfig);
|
||||
assertThat(lrConfig.choiceCount).isEqualTo(3);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -303,10 +309,13 @@ public class CdsLoadBalancer2Test {
|
|||
upstreamTlsContext, outlierDetection);
|
||||
assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(2), cluster4,
|
||||
DiscoveryMechanism.Type.EDS, null, null, LRS_SERVER_INFO, 300L, null, outlierDetection);
|
||||
assertThat(childLbConfig.lbPolicy.getProvider().getPolicyName())
|
||||
assertThat(
|
||||
GracefulSwitchLoadBalancerAccessor.getChildProvider(childLbConfig.lbConfig).getPolicyName())
|
||||
.isEqualTo("ring_hash_experimental"); // dominated by top-level cluster's config
|
||||
assertThat(((RingHashConfig) childLbConfig.lbPolicy.getConfig()).minRingSize).isEqualTo(100L);
|
||||
assertThat(((RingHashConfig) childLbConfig.lbPolicy.getConfig()).maxRingSize).isEqualTo(1000L);
|
||||
RingHashConfig ringHashConfig = (RingHashConfig)
|
||||
GracefulSwitchLoadBalancerAccessor.getChildConfig(childLbConfig.lbConfig);
|
||||
assertThat(ringHashConfig.minRingSize).isEqualTo(100L);
|
||||
assertThat(ringHashConfig.maxRingSize).isEqualTo(1000L);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -665,9 +674,9 @@ public class CdsLoadBalancer2Test {
|
|||
xdsClient.deliverCdsUpdate(CLUSTER,
|
||||
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext,
|
||||
outlierDetection)
|
||||
.lbPolicyConfig(ImmutableMap.of("unknown", ImmutableMap.of("foo", "bar"))).build());
|
||||
.lbPolicyConfig(ImmutableMap.of("unknownLb", ImmutableMap.of("foo", "bar"))).build());
|
||||
} catch (Exception e) {
|
||||
assertThat(e).hasMessageThat().contains("No provider available");
|
||||
assertThat(e).hasMessageThat().contains("unknownLb");
|
||||
return;
|
||||
}
|
||||
fail("Expected the unknown LB to cause an exception");
|
||||
|
|
|
@ -49,7 +49,6 @@ import io.grpc.SynchronizationContext;
|
|||
import io.grpc.internal.FakeClock;
|
||||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.internal.PickSubchannelArgsImpl;
|
||||
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
|
||||
import io.grpc.protobuf.ProtoUtils;
|
||||
import io.grpc.testing.TestMethodDescriptors;
|
||||
import io.grpc.util.GracefulSwitchLoadBalancer;
|
||||
|
@ -178,8 +177,9 @@ public class ClusterImplLoadBalancerTest {
|
|||
Object weightedTargetConfig = new Object();
|
||||
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
|
||||
null, Collections.<DropOverload>emptyList(),
|
||||
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
|
||||
Collections.emptyMap());
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
weightedTargetProvider, weightedTargetConfig),
|
||||
null, Collections.emptyMap());
|
||||
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
|
||||
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(downstreamBalancers);
|
||||
|
@ -204,8 +204,9 @@ public class ClusterImplLoadBalancerTest {
|
|||
ClusterImplConfig configWithWeightedTarget = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME,
|
||||
LRS_SERVER_INFO,
|
||||
null, Collections.<DropOverload>emptyList(),
|
||||
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
|
||||
Collections.emptyMap());
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
weightedTargetProvider, weightedTargetConfig),
|
||||
null, Collections.emptyMap());
|
||||
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
|
||||
deliverAddressesAndConfig(Collections.singletonList(endpoint), configWithWeightedTarget);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(downstreamBalancers);
|
||||
|
@ -218,8 +219,9 @@ public class ClusterImplLoadBalancerTest {
|
|||
ClusterImplConfig configWithWrrLocality = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME,
|
||||
LRS_SERVER_INFO,
|
||||
null, Collections.<DropOverload>emptyList(),
|
||||
new PolicySelection(wrrLocalityProvider, wrrLocalityConfig), null,
|
||||
Collections.emptyMap());
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
wrrLocalityProvider, wrrLocalityConfig),
|
||||
null, Collections.emptyMap());
|
||||
deliverAddressesAndConfig(Collections.singletonList(endpoint), configWithWrrLocality);
|
||||
childBalancer = Iterables.getOnlyElement(downstreamBalancers);
|
||||
assertThat(childBalancer.name).isEqualTo(XdsLbPolicies.WRR_LOCALITY_POLICY_NAME);
|
||||
|
@ -243,8 +245,9 @@ public class ClusterImplLoadBalancerTest {
|
|||
Object weightedTargetConfig = new Object();
|
||||
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
|
||||
null, Collections.<DropOverload>emptyList(),
|
||||
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
|
||||
Collections.emptyMap());
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
weightedTargetProvider, weightedTargetConfig),
|
||||
null, Collections.emptyMap());
|
||||
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
|
||||
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(downstreamBalancers);
|
||||
|
@ -263,8 +266,9 @@ public class ClusterImplLoadBalancerTest {
|
|||
buildWeightedTargetConfig(ImmutableMap.of(locality, 10));
|
||||
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
|
||||
null, Collections.<DropOverload>emptyList(),
|
||||
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
|
||||
Collections.emptyMap());
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
weightedTargetProvider, weightedTargetConfig),
|
||||
null, Collections.emptyMap());
|
||||
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
|
||||
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
|
||||
FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers);
|
||||
|
@ -290,8 +294,9 @@ public class ClusterImplLoadBalancerTest {
|
|||
buildWeightedTargetConfig(ImmutableMap.of(locality, 10));
|
||||
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
|
||||
null, Collections.<DropOverload>emptyList(),
|
||||
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
|
||||
Collections.emptyMap());
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
weightedTargetProvider, weightedTargetConfig),
|
||||
null, Collections.emptyMap());
|
||||
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
|
||||
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
|
||||
FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers);
|
||||
|
@ -375,8 +380,9 @@ public class ClusterImplLoadBalancerTest {
|
|||
buildWeightedTargetConfig(ImmutableMap.of(locality, 10));
|
||||
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
|
||||
null, Collections.singletonList(DropOverload.create("throttle", 500_000)),
|
||||
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
|
||||
Collections.emptyMap());
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
weightedTargetProvider, weightedTargetConfig),
|
||||
null, Collections.emptyMap());
|
||||
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
|
||||
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
|
||||
when(mockRandom.nextInt(anyInt())).thenReturn(499_999, 999_999, 1_000_000);
|
||||
|
@ -405,8 +411,9 @@ public class ClusterImplLoadBalancerTest {
|
|||
// Config update updates drop policies.
|
||||
config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, null,
|
||||
Collections.singletonList(DropOverload.create("lb", 1_000_000)),
|
||||
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
|
||||
Collections.emptyMap());
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
weightedTargetProvider, weightedTargetConfig),
|
||||
null, Collections.emptyMap());
|
||||
loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(Collections.singletonList(endpoint))
|
||||
|
@ -453,8 +460,9 @@ public class ClusterImplLoadBalancerTest {
|
|||
buildWeightedTargetConfig(ImmutableMap.of(locality, 10));
|
||||
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
|
||||
maxConcurrentRequests, Collections.<DropOverload>emptyList(),
|
||||
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
|
||||
Collections.emptyMap());
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
weightedTargetProvider, weightedTargetConfig),
|
||||
null, Collections.emptyMap());
|
||||
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
|
||||
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
|
||||
assertThat(downstreamBalancers).hasSize(1); // one leaf balancer
|
||||
|
@ -496,8 +504,9 @@ public class ClusterImplLoadBalancerTest {
|
|||
maxConcurrentRequests = 101L;
|
||||
config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
|
||||
maxConcurrentRequests, Collections.<DropOverload>emptyList(),
|
||||
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
|
||||
Collections.emptyMap());
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
weightedTargetProvider, weightedTargetConfig),
|
||||
null, Collections.emptyMap());
|
||||
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
|
||||
|
||||
result = currentPicker.pickSubchannel(pickSubchannelArgs);
|
||||
|
@ -543,8 +552,9 @@ public class ClusterImplLoadBalancerTest {
|
|||
buildWeightedTargetConfig(ImmutableMap.of(locality, 10));
|
||||
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
|
||||
null, Collections.<DropOverload>emptyList(),
|
||||
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
|
||||
Collections.emptyMap());
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
weightedTargetProvider, weightedTargetConfig),
|
||||
null, Collections.emptyMap());
|
||||
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
|
||||
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
|
||||
assertThat(downstreamBalancers).hasSize(1); // one leaf balancer
|
||||
|
@ -590,8 +600,9 @@ public class ClusterImplLoadBalancerTest {
|
|||
buildWeightedTargetConfig(ImmutableMap.of(locality, 10));
|
||||
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
|
||||
null, Collections.<DropOverload>emptyList(),
|
||||
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
|
||||
Collections.emptyMap());
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
weightedTargetProvider, weightedTargetConfig),
|
||||
null, Collections.emptyMap());
|
||||
// One locality with two endpoints.
|
||||
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr1", locality);
|
||||
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr2", locality);
|
||||
|
@ -628,8 +639,9 @@ public class ClusterImplLoadBalancerTest {
|
|||
buildWeightedTargetConfig(ImmutableMap.of(locality, 10));
|
||||
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
|
||||
null, Collections.<DropOverload>emptyList(),
|
||||
new PolicySelection(weightedTargetProvider, weightedTargetConfig), upstreamTlsContext,
|
||||
Collections.emptyMap());
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
weightedTargetProvider, weightedTargetConfig),
|
||||
upstreamTlsContext, Collections.emptyMap());
|
||||
// One locality with two endpoints.
|
||||
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr1", locality);
|
||||
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr2", locality);
|
||||
|
@ -652,8 +664,9 @@ public class ClusterImplLoadBalancerTest {
|
|||
// Removes UpstreamTlsContext from the config.
|
||||
config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
|
||||
null, Collections.<DropOverload>emptyList(),
|
||||
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null,
|
||||
Collections.emptyMap());
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
weightedTargetProvider, weightedTargetConfig),
|
||||
null, Collections.emptyMap());
|
||||
deliverAddressesAndConfig(Arrays.asList(endpoint1, endpoint2), config);
|
||||
assertThat(Iterables.getOnlyElement(downstreamBalancers)).isSameInstanceAs(leafBalancer);
|
||||
subchannel = leafBalancer.helper.createSubchannel(args); // creates new connections
|
||||
|
@ -667,8 +680,9 @@ public class ClusterImplLoadBalancerTest {
|
|||
CommonTlsContextTestsUtil.buildUpstreamTlsContext("google_cloud_private_spiffe1", true);
|
||||
config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
|
||||
null, Collections.<DropOverload>emptyList(),
|
||||
new PolicySelection(weightedTargetProvider, weightedTargetConfig), upstreamTlsContext,
|
||||
Collections.emptyMap());
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
weightedTargetProvider, weightedTargetConfig),
|
||||
upstreamTlsContext, Collections.emptyMap());
|
||||
deliverAddressesAndConfig(Arrays.asList(endpoint1, endpoint2), config);
|
||||
assertThat(Iterables.getOnlyElement(downstreamBalancers)).isSameInstanceAs(leafBalancer);
|
||||
subchannel = leafBalancer.helper.createSubchannel(args); // creates new connections
|
||||
|
|
|
@ -58,7 +58,6 @@ import io.grpc.internal.FakeClock;
|
|||
import io.grpc.internal.FakeClock.ScheduledTask;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
|
||||
import io.grpc.util.GracefulSwitchLoadBalancer;
|
||||
import io.grpc.util.GracefulSwitchLoadBalancerAccessor;
|
||||
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig;
|
||||
|
@ -159,13 +158,13 @@ public class ClusterResolverLoadBalancerTest {
|
|||
private final FakeClock fakeClock = new FakeClock();
|
||||
private final LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry();
|
||||
private final NameResolverRegistry nsRegistry = new NameResolverRegistry();
|
||||
private final PolicySelection roundRobin = new PolicySelection(
|
||||
private final Object roundRobin = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
new FakeLoadBalancerProvider("wrr_locality_experimental"), new WrrLocalityConfig(
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
new FakeLoadBalancerProvider("round_robin"), null)));
|
||||
private final PolicySelection ringHash = new PolicySelection(
|
||||
private final Object ringHash = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
new FakeLoadBalancerProvider("ring_hash_experimental"), new RingHashConfig(10L, 100L));
|
||||
private final PolicySelection leastRequest = new PolicySelection(
|
||||
private final Object leastRequest = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
new FakeLoadBalancerProvider("wrr_locality_experimental"), new WrrLocalityConfig(
|
||||
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
|
||||
new FakeLoadBalancerProvider("least_request_experimental"),
|
||||
|
@ -293,8 +292,8 @@ public class ClusterResolverLoadBalancerTest {
|
|||
(ClusterImplConfig) priorityChildConfig.policySelection.getConfig();
|
||||
assertClusterImplConfig(clusterImplConfig, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L,
|
||||
tlsContext, Collections.<DropOverload>emptyList(), "ring_hash_experimental");
|
||||
RingHashConfig ringHashConfig =
|
||||
(RingHashConfig) clusterImplConfig.childPolicy.getConfig();
|
||||
RingHashConfig ringHashConfig = (RingHashConfig)
|
||||
GracefulSwitchLoadBalancerAccessor.getChildConfig(clusterImplConfig.childConfig);
|
||||
assertThat(ringHashConfig.minRingSize).isEqualTo(10L);
|
||||
assertThat(ringHashConfig.maxRingSize).isEqualTo(100L);
|
||||
}
|
||||
|
@ -333,8 +332,8 @@ public class ClusterResolverLoadBalancerTest {
|
|||
(ClusterImplConfig) priorityChildConfig.policySelection.getConfig();
|
||||
assertClusterImplConfig(clusterImplConfig, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L,
|
||||
tlsContext, Collections.<DropOverload>emptyList(), WRR_LOCALITY_POLICY_NAME);
|
||||
WrrLocalityConfig wrrLocalityConfig =
|
||||
(WrrLocalityConfig) clusterImplConfig.childPolicy.getConfig();
|
||||
WrrLocalityConfig wrrLocalityConfig = (WrrLocalityConfig)
|
||||
GracefulSwitchLoadBalancerAccessor.getChildConfig(clusterImplConfig.childConfig);
|
||||
LoadBalancerProvider childProvider =
|
||||
GracefulSwitchLoadBalancerAccessor.getChildProvider(wrrLocalityConfig.childConfig);
|
||||
assertThat(childProvider.getPolicyName()).isEqualTo("least_request_experimental");
|
||||
|
@ -417,8 +416,8 @@ public class ClusterResolverLoadBalancerTest {
|
|||
GracefulSwitchLoadBalancerAccessor.getChildConfig(outlierDetectionConfig.childConfig);
|
||||
assertClusterImplConfig(clusterImplConfig, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L,
|
||||
tlsContext, Collections.<DropOverload>emptyList(), WRR_LOCALITY_POLICY_NAME);
|
||||
WrrLocalityConfig wrrLocalityConfig =
|
||||
(WrrLocalityConfig) clusterImplConfig.childPolicy.getConfig();
|
||||
WrrLocalityConfig wrrLocalityConfig = (WrrLocalityConfig)
|
||||
GracefulSwitchLoadBalancerAccessor.getChildConfig(clusterImplConfig.childConfig);
|
||||
LoadBalancerProvider childProvider =
|
||||
GracefulSwitchLoadBalancerAccessor.getChildProvider(wrrLocalityConfig.childConfig);
|
||||
assertThat(childProvider.getPolicyName()).isEqualTo("least_request_experimental");
|
||||
|
@ -487,9 +486,8 @@ public class ClusterResolverLoadBalancerTest {
|
|||
(ClusterImplConfig) priorityChildConfig1.policySelection.getConfig();
|
||||
assertClusterImplConfig(clusterImplConfig1, CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_INFO, 200L,
|
||||
tlsContext, Collections.<DropOverload>emptyList(), WRR_LOCALITY_POLICY_NAME);
|
||||
assertThat(clusterImplConfig1.childPolicy.getConfig()).isInstanceOf(WrrLocalityConfig.class);
|
||||
WrrLocalityConfig wrrLocalityConfig1 =
|
||||
(WrrLocalityConfig) clusterImplConfig1.childPolicy.getConfig();
|
||||
WrrLocalityConfig wrrLocalityConfig1 = (WrrLocalityConfig)
|
||||
GracefulSwitchLoadBalancerAccessor.getChildConfig(clusterImplConfig1.childConfig);
|
||||
LoadBalancerProvider childProvider1 =
|
||||
GracefulSwitchLoadBalancerAccessor.getChildProvider(wrrLocalityConfig1.childConfig);
|
||||
assertThat(childProvider1.getPolicyName()).isEqualTo("round_robin");
|
||||
|
@ -502,9 +500,8 @@ public class ClusterResolverLoadBalancerTest {
|
|||
(ClusterImplConfig) priorityChildConfig2.policySelection.getConfig();
|
||||
assertClusterImplConfig(clusterImplConfig2, CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_INFO, 200L,
|
||||
tlsContext, Collections.<DropOverload>emptyList(), WRR_LOCALITY_POLICY_NAME);
|
||||
assertThat(clusterImplConfig2.childPolicy.getConfig()).isInstanceOf(WrrLocalityConfig.class);
|
||||
WrrLocalityConfig wrrLocalityConfig2 =
|
||||
(WrrLocalityConfig) clusterImplConfig1.childPolicy.getConfig();
|
||||
WrrLocalityConfig wrrLocalityConfig2 = (WrrLocalityConfig)
|
||||
GracefulSwitchLoadBalancerAccessor.getChildConfig(clusterImplConfig1.childConfig);
|
||||
LoadBalancerProvider childProvider2 =
|
||||
GracefulSwitchLoadBalancerAccessor.getChildProvider(wrrLocalityConfig2.childConfig);
|
||||
assertThat(childProvider2.getPolicyName()).isEqualTo("round_robin");
|
||||
|
@ -517,9 +514,8 @@ public class ClusterResolverLoadBalancerTest {
|
|||
(ClusterImplConfig) priorityChildConfig3.policySelection.getConfig();
|
||||
assertClusterImplConfig(clusterImplConfig3, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L,
|
||||
tlsContext, Collections.<DropOverload>emptyList(), WRR_LOCALITY_POLICY_NAME);
|
||||
assertThat(clusterImplConfig3.childPolicy.getConfig()).isInstanceOf(WrrLocalityConfig.class);
|
||||
WrrLocalityConfig wrrLocalityConfig3 =
|
||||
(WrrLocalityConfig) clusterImplConfig1.childPolicy.getConfig();
|
||||
WrrLocalityConfig wrrLocalityConfig3 = (WrrLocalityConfig)
|
||||
GracefulSwitchLoadBalancerAccessor.getChildConfig(clusterImplConfig1.childConfig);
|
||||
LoadBalancerProvider childProvider3 =
|
||||
GracefulSwitchLoadBalancerAccessor.getChildProvider(wrrLocalityConfig3.childConfig);
|
||||
assertThat(childProvider3.getPolicyName()).isEqualTo("round_robin");
|
||||
|
@ -1148,7 +1144,9 @@ public class ClusterResolverLoadBalancerTest {
|
|||
assertThat(config.maxConcurrentRequests).isEqualTo(maxConcurrentRequests);
|
||||
assertThat(config.tlsContext).isEqualTo(tlsContext);
|
||||
assertThat(config.dropCategories).isEqualTo(dropCategories);
|
||||
assertThat(config.childPolicy.getProvider().getPolicyName()).isEqualTo(childPolicy);
|
||||
assertThat(
|
||||
GracefulSwitchLoadBalancerAccessor.getChildProvider(config.childConfig).getPolicyName())
|
||||
.isEqualTo(childPolicy);
|
||||
}
|
||||
|
||||
/** Asserts two list of EAGs contains same addresses, regardless of attributes. */
|
||||
|
|
Loading…
Reference in New Issue