xds: ClusterManager LB state/picker update fix (#9404)

* xds: ClusterManager LB state/picker update fix

Correctly set the currentState and curentPicker when the
child LB updates balancing state
This commit is contained in:
Terry Wilson 2022-07-21 13:54:40 -07:00 committed by GitHub
parent dcac7689fa
commit 4850ad219e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 15 deletions

View File

@ -264,14 +264,14 @@ class ClusterManagerLoadBalancer extends LoadBalancer {
final SubchannelPicker newPicker) { final SubchannelPicker newPicker) {
// If we are already in the process of resolving addresses, the overall balancing state // If we are already in the process of resolving addresses, the overall balancing state
// will be updated at the end of it, and we don't need to trigger that update here. // will be updated at the end of it, and we don't need to trigger that update here.
if (resolvingAddresses || !childLbStates.containsKey(name)) { if (!childLbStates.containsKey(name)) {
return; return;
} }
// Subchannel picker and state are saved, but will only be propagated to the channel // Subchannel picker and state are saved, but will only be propagated to the channel
// when the child instance exits deactivated state. // when the child instance exits deactivated state.
currentState = newState; currentState = newState;
currentPicker = newPicker; currentPicker = newPicker;
if (!deactivated) { if (!deactivated && !resolvingAddresses) {
updateOverallBalancingState(); updateOverallBalancingState();
} }
} }

View File

@ -20,6 +20,7 @@ import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.clearInvocations;
@ -91,7 +92,7 @@ public class ClusterManagerLoadBalancerTest {
private final Map<String, Object> lbConfigInventory = new HashMap<>(); private final Map<String, Object> lbConfigInventory = new HashMap<>();
private final List<FakeLoadBalancer> childBalancers = new ArrayList<>(); private final List<FakeLoadBalancer> childBalancers = new ArrayList<>();
private LoadBalancer clusterManagerLoadBalancer; private ClusterManagerLoadBalancer clusterManagerLoadBalancer;
@Before @Before
public void setUp() { public void setUp() {
@ -253,28 +254,33 @@ public class ClusterManagerLoadBalancerTest {
@Test @Test
public void noDuplicateOverallBalancingStateUpdate() { public void noDuplicateOverallBalancingStateUpdate() {
deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b")); deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b"), true);
// The test child LBs would have triggered state updates, let's make sure the overall balancing // The test child LBs would have triggered state updates, let's make sure the overall balancing
// state was only updated once. // state was only updated once but that the new state reflects the state the child LB reported.
verify(helper, times(1)).updateBalancingState(any(), any()); verify(helper, times(1)).updateBalancingState(
eq(TRANSIENT_FAILURE), isA(SubchannelPicker.class));
} }
private void deliverResolvedAddresses(final Map<String, String> childPolicies) { private void deliverResolvedAddresses(final Map<String, String> childPolicies) {
deliverResolvedAddresses(childPolicies, false);
}
private void deliverResolvedAddresses(final Map<String, String> childPolicies, boolean failing) {
clusterManagerLoadBalancer.handleResolvedAddresses( clusterManagerLoadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder() ResolvedAddresses.newBuilder()
.setAddresses(Collections.<EquivalentAddressGroup>emptyList()) .setAddresses(Collections.<EquivalentAddressGroup>emptyList())
.setLoadBalancingPolicyConfig(buildConfig(childPolicies)) .setLoadBalancingPolicyConfig(buildConfig(childPolicies, failing))
.build()); .build());
} }
private ClusterManagerConfig buildConfig(Map<String, String> childPolicies) { private ClusterManagerConfig buildConfig(Map<String, String> childPolicies, boolean failing) {
Map<String, PolicySelection> childPolicySelections = new LinkedHashMap<>(); Map<String, PolicySelection> childPolicySelections = new LinkedHashMap<>();
for (String name : childPolicies.keySet()) { for (String name : childPolicies.keySet()) {
String childPolicyName = childPolicies.get(name); String childPolicyName = childPolicies.get(name);
Object childConfig = lbConfigInventory.get(name); Object childConfig = lbConfigInventory.get(name);
PolicySelection policy = PolicySelection policy =
new PolicySelection(new FakeLoadBalancerProvider(childPolicyName), childConfig); new PolicySelection(new FakeLoadBalancerProvider(childPolicyName, failing), childConfig);
childPolicySelections.put(name, policy); childPolicySelections.put(name, policy);
} }
return new ClusterManagerConfig(childPolicySelections); return new ClusterManagerConfig(childPolicySelections);
@ -297,14 +303,16 @@ public class ClusterManagerLoadBalancerTest {
private final class FakeLoadBalancerProvider extends LoadBalancerProvider { private final class FakeLoadBalancerProvider extends LoadBalancerProvider {
private final String policyName; private final String policyName;
private final boolean failing;
FakeLoadBalancerProvider(String policyName) { FakeLoadBalancerProvider(String policyName, boolean failing) {
this.policyName = policyName; this.policyName = policyName;
this.failing = failing;
} }
@Override @Override
public LoadBalancer newLoadBalancer(Helper helper) { public LoadBalancer newLoadBalancer(Helper helper) {
FakeLoadBalancer balancer = new FakeLoadBalancer(policyName, helper); FakeLoadBalancer balancer = new FakeLoadBalancer(policyName, helper, failing);
childBalancers.add(balancer); childBalancers.add(balancer);
return balancer; return balancer;
} }
@ -328,22 +336,24 @@ public class ClusterManagerLoadBalancerTest {
private final class FakeLoadBalancer extends LoadBalancer { private final class FakeLoadBalancer extends LoadBalancer {
private final String name; private final String name;
private final Helper helper; private final Helper helper;
private final boolean failing;
private Object config; private Object config;
private Status upstreamError; private Status upstreamError;
private boolean shutdown; private boolean shutdown;
FakeLoadBalancer(String name, Helper helper) { FakeLoadBalancer(String name, Helper helper, boolean failing) {
this.name = name; this.name = name;
this.helper = helper; this.helper = helper;
this.failing = failing;
} }
@Override @Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
config = resolvedAddresses.getLoadBalancingPolicyConfig(); config = resolvedAddresses.getLoadBalancingPolicyConfig();
// Update balancing state here so that concurrent child state changes can be easily tested. if (failing) {
// Most tests ignore this and trigger separate child LB updates. helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL));
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL)); }
} }
@Override @Override