diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index 7a6bff4d77..5cae9139ae 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -58,7 +58,7 @@ final class PriorityLoadBalancer extends LoadBalancer { private final XdsLogger logger; // Includes all active and deactivated children. Mutable. New entries are only added from priority - // 0 up to the selected priority. An entry is only deleted 15 minutes after its deactivation. + // 0 up to the selected priority. An entry is only deleted 15 minutes after the its deactivation. private final Map children = new HashMap<>(); // Following fields are only null initially. @@ -70,8 +70,6 @@ final class PriorityLoadBalancer extends LoadBalancer { @Nullable private String currentPriority; private ConnectivityState currentConnectivityState; private SubchannelPicker currentPicker; - // Set to true if currently in the process of handling resolved addresses. - private boolean handlingResolvedAddresses; PriorityLoadBalancer(Helper helper) { this.helper = checkNotNull(helper, "helper"); @@ -84,15 +82,6 @@ final class PriorityLoadBalancer extends LoadBalancer { @Override public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { - try { - handlingResolvedAddresses = true; - handleResolvedAddressesInternal(resolvedAddresses); - } finally { - handlingResolvedAddresses = false; - } - } - - public void handleResolvedAddressesInternal(ResolvedAddresses resolvedAddresses) { logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); this.resolvedAddresses = resolvedAddresses; PriorityLbConfig config = (PriorityLbConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); @@ -308,34 +297,32 @@ final class PriorityLoadBalancer extends LoadBalancer { @Override public void updateBalancingState(final ConnectivityState newState, final SubchannelPicker newPicker) { - if (!children.containsKey(priority)) { - return; - } - connectivityState = newState; - picker = newPicker; - - if (deletionTimer != null && deletionTimer.isPending()) { - return; - } - if (newState.equals(CONNECTING)) { - if (!failOverTimer.isPending() && seenReadyOrIdleSinceTransientFailure) { - failOverTimer = syncContext.schedule(new FailOverTask(), 10, TimeUnit.SECONDS, - executor); + syncContext.execute(new Runnable() { + @Override + public void run() { + if (!children.containsKey(priority)) { + return; + } + connectivityState = newState; + picker = newPicker; + if (deletionTimer != null && deletionTimer.isPending()) { + return; + } + if (newState.equals(CONNECTING) ) { + if (!failOverTimer.isPending() && seenReadyOrIdleSinceTransientFailure) { + failOverTimer = syncContext.schedule(new FailOverTask(), 10, TimeUnit.SECONDS, + executor); + } + } else if (newState.equals(READY) || newState.equals(IDLE)) { + seenReadyOrIdleSinceTransientFailure = true; + failOverTimer.cancel(); + } else if (newState.equals(TRANSIENT_FAILURE)) { + seenReadyOrIdleSinceTransientFailure = false; + failOverTimer.cancel(); + } + tryNextPriority(); } - } else if (newState.equals(READY) || newState.equals(IDLE)) { - seenReadyOrIdleSinceTransientFailure = true; - failOverTimer.cancel(); - } else if (newState.equals(TRANSIENT_FAILURE)) { - seenReadyOrIdleSinceTransientFailure = false; - failOverTimer.cancel(); - } - - // If we are currently handling newly resolved addresses, let's not try to reconfigure as - // the address handling process will take care of that to provide an atomic config update. - if (handlingResolvedAddresses) { - return; - } - tryNextPriority(); + }); } @Override diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java index 7227dcb3b2..6b1fae48f1 100644 --- a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -22,8 +22,8 @@ import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doReturn; @@ -676,7 +676,7 @@ public class PriorityLoadBalancerTest { .setAddresses(ImmutableList.of()) .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); - verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER)); + verify(helper, times(2)).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class)); // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, // any further balancing state update should be ignored. @@ -686,26 +686,6 @@ public class PriorityLoadBalancerTest { verifyNoMoreInteractions(helper); } - @Test - public void noDuplicateOverallBalancingStateUpdate() { - FakeLoadBalancerProvider fakeLbProvider = new FakeLoadBalancerProvider(); - - PriorityChildConfig priorityChildConfig0 = - new PriorityChildConfig(new PolicySelection(fakeLbProvider, new Object()), true); - PriorityChildConfig priorityChildConfig1 = - new PriorityChildConfig(new PolicySelection(fakeLbProvider, new Object()), false); - PriorityLbConfig priorityLbConfig = - new PriorityLbConfig( - ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1), - ImmutableList.of("p0", "p1")); - priorityLb.handleResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(ImmutableList.of()) - .setLoadBalancingPolicyConfig(priorityLbConfig) - .build()); - verify(helper, times(1)).updateBalancingState(any(), any()); - } - private void assertLatestConnectivityState(ConnectivityState expectedState) { verify(helper, atLeastOnce()) .updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture()); @@ -734,49 +714,4 @@ public class PriorityLoadBalancerTest { PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); assertThat(pickResult).isEqualTo(PickResult.withNoResult()); } - - private static class FakeLoadBalancerProvider extends LoadBalancerProvider { - - @Override - public boolean isAvailable() { - return true; - } - - @Override - public int getPriority() { - return 5; - } - - @Override - public String getPolicyName() { - return "foo"; - } - - @Override - public LoadBalancer newLoadBalancer(Helper helper) { - return new FakeLoadBalancer(helper); - } - } - - static class FakeLoadBalancer extends LoadBalancer { - - private Helper helper; - - FakeLoadBalancer(Helper helper) { - this.helper = helper; - } - - @Override - public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { - helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL)); - } - - @Override - public void handleNameResolutionError(Status error) { - } - - @Override - public void shutdown() { - } - } }