mirror of https://github.com/grpc/grpc-java.git
xds: Allow child of cluster_impl LB to change (#10091)
Under normal conditions the child LB of `ClusterImplLoadBalancer` does not fluctuate, based on the field used to configure load balancing in the xDS `Cluster` proto it is either: 1. `WrrLocalityLoadBalancer` if the newer `load_balancing_policy` field is used 2. `WeightedTargetLoadBalancer` if the legacy `lb_policy` field is used `ClusterImplLoadBalancer` currently assumes that this child does not change and so does not change the child LB when the name resolver sends an update. If the control plane does switch to using a different field for LB config, that update will have an LB config meant for the other child LB type. This will result in a ClassCastException and a channel panic. To address this, `ClusterImplLoadBalancer` will now use `GracefulSwitchLoadBalancer` and makes sure if the child policy changes the correct LB implementation is switched to.
This commit is contained in:
parent
8ca99f661d
commit
68b67b616e
|
@ -35,6 +35,7 @@ import io.grpc.internal.ForwardingClientStreamTracer;
|
|||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.util.ForwardingLoadBalancerHelper;
|
||||
import io.grpc.util.ForwardingSubchannel;
|
||||
import io.grpc.util.GracefulSwitchLoadBalancer;
|
||||
import io.grpc.xds.Bootstrapper.ServerInfo;
|
||||
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
|
||||
import io.grpc.xds.Endpoints.DropOverload;
|
||||
|
@ -87,7 +88,7 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
|
|||
private CallCounterProvider callCounterProvider;
|
||||
private ClusterDropStats dropStats;
|
||||
private ClusterImplLbHelper childLbHelper;
|
||||
private LoadBalancer childLb;
|
||||
private GracefulSwitchLoadBalancer childSwitchLb;
|
||||
|
||||
ClusterImplLoadBalancer(Helper helper) {
|
||||
this(helper, ThreadSafeRandomImpl.instance);
|
||||
|
@ -120,7 +121,7 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
|
|||
childLbHelper = new ClusterImplLbHelper(
|
||||
callCounterProvider.getOrCreate(config.cluster, config.edsServiceName),
|
||||
config.lrsServerInfo);
|
||||
childLb = config.childPolicy.getProvider().newLoadBalancer(childLbHelper);
|
||||
childSwitchLb = new GracefulSwitchLoadBalancer(childLbHelper);
|
||||
// Assume load report server does not change throughout cluster lifetime.
|
||||
if (config.lrsServerInfo != null) {
|
||||
dropStats = xdsClient.addClusterDropStats(config.lrsServerInfo, cluster, edsServiceName);
|
||||
|
@ -129,7 +130,9 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
|
|||
childLbHelper.updateDropPolicies(config.dropCategories);
|
||||
childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests);
|
||||
childLbHelper.updateSslContextProviderSupplier(config.tlsContext);
|
||||
childLb.handleResolvedAddresses(
|
||||
|
||||
childSwitchLb.switchTo(config.childPolicy.getProvider());
|
||||
childSwitchLb.handleResolvedAddresses(
|
||||
resolvedAddresses.toBuilder()
|
||||
.setAttributes(attributes)
|
||||
.setLoadBalancingPolicyConfig(config.childPolicy.getConfig())
|
||||
|
@ -139,8 +142,8 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
|
|||
|
||||
@Override
|
||||
public void handleNameResolutionError(Status error) {
|
||||
if (childLb != null) {
|
||||
childLb.handleNameResolutionError(error);
|
||||
if (childSwitchLb != null) {
|
||||
childSwitchLb.handleNameResolutionError(error);
|
||||
} else {
|
||||
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker(error));
|
||||
}
|
||||
|
@ -151,8 +154,8 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
|
|||
if (dropStats != null) {
|
||||
dropStats.release();
|
||||
}
|
||||
if (childLb != null) {
|
||||
childLb.shutdown();
|
||||
if (childSwitchLb != null) {
|
||||
childSwitchLb.shutdown();
|
||||
if (childLbHelper != null) {
|
||||
childLbHelper.updateSslContextProviderSupplier(null);
|
||||
childLbHelper = null;
|
||||
|
|
|
@ -165,6 +165,41 @@ public class ClusterImplLoadBalancerTest {
|
|||
.isSameInstanceAs(xdsClientPool);
|
||||
}
|
||||
|
||||
/**
|
||||
* If the control plane switches from using the legacy lb_policy field in the xDS Cluster proto
|
||||
* to the newer load_balancing_policy then the child policy can switch from weighted_target to
|
||||
* xds_wrr_locality (this could happen the opposite way as well). This test assures that this
|
||||
* results in the child LB changing if this were to happen. If this is not done correctly the new
|
||||
* configuration would be given to the old LB implementation which would cause a channel panic.
|
||||
*/
|
||||
@Test
|
||||
public void handleResolvedAddresses_childPolicyChanges() {
|
||||
FakeLoadBalancerProvider weightedTargetProvider =
|
||||
new FakeLoadBalancerProvider(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME);
|
||||
Object weightedTargetConfig = new Object();
|
||||
ClusterImplConfig configWithWeightedTarget = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME,
|
||||
LRS_SERVER_INFO,
|
||||
null, Collections.<DropOverload>emptyList(),
|
||||
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null);
|
||||
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
|
||||
deliverAddressesAndConfig(Collections.singletonList(endpoint), configWithWeightedTarget);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(downstreamBalancers);
|
||||
assertThat(childBalancer.name).isEqualTo(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME);
|
||||
assertThat(childBalancer.config).isSameInstanceAs(weightedTargetConfig);
|
||||
|
||||
FakeLoadBalancerProvider wrrLocalityProvider =
|
||||
new FakeLoadBalancerProvider(XdsLbPolicies.WRR_LOCALITY_POLICY_NAME);
|
||||
Object wrrLocalityConfig = new Object();
|
||||
ClusterImplConfig configWithWrrLocality = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME,
|
||||
LRS_SERVER_INFO,
|
||||
null, Collections.<DropOverload>emptyList(),
|
||||
new PolicySelection(wrrLocalityProvider, wrrLocalityConfig), null);
|
||||
deliverAddressesAndConfig(Collections.singletonList(endpoint), configWithWrrLocality);
|
||||
childBalancer = Iterables.getOnlyElement(downstreamBalancers);
|
||||
assertThat(childBalancer.name).isEqualTo(XdsLbPolicies.WRR_LOCALITY_POLICY_NAME);
|
||||
assertThat(childBalancer.config).isSameInstanceAs(wrrLocalityConfig);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void nameResolutionError_beforeChildPolicyInstantiated_returnErrorPickerToUpstream() {
|
||||
loadBalancer.handleNameResolutionError(Status.UNIMPLEMENTED.withDescription("not found"));
|
||||
|
|
Loading…
Reference in New Issue