Revert "xds: prevent concurrent priority LB picker updates (#9363)" (#9554)

This reverts commit bcf5cde7dd.
This commit is contained in:
apolcyn 2022-09-19 08:29:41 -07:00 committed by GitHub
parent e1ad984db3
commit 8925696b3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 106 deletions

View File

@ -58,7 +58,7 @@ final class PriorityLoadBalancer extends LoadBalancer {
private final XdsLogger logger; private final XdsLogger logger;
// Includes all active and deactivated children. Mutable. New entries are only added from priority // Includes all active and deactivated children. Mutable. New entries are only added from priority
// 0 up to the selected priority. An entry is only deleted 15 minutes after its deactivation. // 0 up to the selected priority. An entry is only deleted 15 minutes after the its deactivation.
private final Map<String, ChildLbState> children = new HashMap<>(); private final Map<String, ChildLbState> children = new HashMap<>();
// Following fields are only null initially. // Following fields are only null initially.
@ -70,8 +70,6 @@ final class PriorityLoadBalancer extends LoadBalancer {
@Nullable private String currentPriority; @Nullable private String currentPriority;
private ConnectivityState currentConnectivityState; private ConnectivityState currentConnectivityState;
private SubchannelPicker currentPicker; private SubchannelPicker currentPicker;
// Set to true if currently in the process of handling resolved addresses.
private boolean handlingResolvedAddresses;
PriorityLoadBalancer(Helper helper) { PriorityLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper"); this.helper = checkNotNull(helper, "helper");
@ -84,15 +82,6 @@ final class PriorityLoadBalancer extends LoadBalancer {
@Override @Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
try {
handlingResolvedAddresses = true;
handleResolvedAddressesInternal(resolvedAddresses);
} finally {
handlingResolvedAddresses = false;
}
}
public void handleResolvedAddressesInternal(ResolvedAddresses resolvedAddresses) {
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
this.resolvedAddresses = resolvedAddresses; this.resolvedAddresses = resolvedAddresses;
PriorityLbConfig config = (PriorityLbConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); PriorityLbConfig config = (PriorityLbConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
@ -308,34 +297,32 @@ final class PriorityLoadBalancer extends LoadBalancer {
@Override @Override
public void updateBalancingState(final ConnectivityState newState, public void updateBalancingState(final ConnectivityState newState,
final SubchannelPicker newPicker) { final SubchannelPicker newPicker) {
if (!children.containsKey(priority)) { syncContext.execute(new Runnable() {
return; @Override
} public void run() {
connectivityState = newState; if (!children.containsKey(priority)) {
picker = newPicker; return;
}
if (deletionTimer != null && deletionTimer.isPending()) { connectivityState = newState;
return; picker = newPicker;
} if (deletionTimer != null && deletionTimer.isPending()) {
if (newState.equals(CONNECTING)) { return;
if (!failOverTimer.isPending() && seenReadyOrIdleSinceTransientFailure) { }
failOverTimer = syncContext.schedule(new FailOverTask(), 10, TimeUnit.SECONDS, if (newState.equals(CONNECTING) ) {
executor); if (!failOverTimer.isPending() && seenReadyOrIdleSinceTransientFailure) {
failOverTimer = syncContext.schedule(new FailOverTask(), 10, TimeUnit.SECONDS,
executor);
}
} else if (newState.equals(READY) || newState.equals(IDLE)) {
seenReadyOrIdleSinceTransientFailure = true;
failOverTimer.cancel();
} else if (newState.equals(TRANSIENT_FAILURE)) {
seenReadyOrIdleSinceTransientFailure = false;
failOverTimer.cancel();
}
tryNextPriority();
} }
} else if (newState.equals(READY) || newState.equals(IDLE)) { });
seenReadyOrIdleSinceTransientFailure = true;
failOverTimer.cancel();
} else if (newState.equals(TRANSIENT_FAILURE)) {
seenReadyOrIdleSinceTransientFailure = false;
failOverTimer.cancel();
}
// If we are currently handling newly resolved addresses, let's not try to reconfigure as
// the address handling process will take care of that to provide an atomic config update.
if (handlingResolvedAddresses) {
return;
}
tryNextPriority();
} }
@Override @Override

View File

@ -22,8 +22,8 @@ import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
@ -676,7 +676,7 @@ public class PriorityLoadBalancerTest {
.setAddresses(ImmutableList.<EquivalentAddressGroup>of()) .setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig) .setLoadBalancingPolicyConfig(priorityLbConfig)
.build()); .build());
verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER)); verify(helper, times(2)).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class));
// LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first,
// any further balancing state update should be ignored. // any further balancing state update should be ignored.
@ -686,26 +686,6 @@ public class PriorityLoadBalancerTest {
verifyNoMoreInteractions(helper); verifyNoMoreInteractions(helper);
} }
@Test
public void noDuplicateOverallBalancingStateUpdate() {
FakeLoadBalancerProvider fakeLbProvider = new FakeLoadBalancerProvider();
PriorityChildConfig priorityChildConfig0 =
new PriorityChildConfig(new PolicySelection(fakeLbProvider, new Object()), true);
PriorityChildConfig priorityChildConfig1 =
new PriorityChildConfig(new PolicySelection(fakeLbProvider, new Object()), false);
PriorityLbConfig priorityLbConfig =
new PriorityLbConfig(
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1),
ImmutableList.of("p0", "p1"));
priorityLb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
.build());
verify(helper, times(1)).updateBalancingState(any(), any());
}
private void assertLatestConnectivityState(ConnectivityState expectedState) { private void assertLatestConnectivityState(ConnectivityState expectedState) {
verify(helper, atLeastOnce()) verify(helper, atLeastOnce())
.updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture()); .updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture());
@ -734,49 +714,4 @@ public class PriorityLoadBalancerTest {
PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(pickResult).isEqualTo(PickResult.withNoResult()); assertThat(pickResult).isEqualTo(PickResult.withNoResult());
} }
private static class FakeLoadBalancerProvider extends LoadBalancerProvider {
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 5;
}
@Override
public String getPolicyName() {
return "foo";
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return new FakeLoadBalancer(helper);
}
}
static class FakeLoadBalancer extends LoadBalancer {
private Helper helper;
FakeLoadBalancer(Helper helper) {
this.helper = helper;
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL));
}
@Override
public void handleNameResolutionError(Status error) {
}
@Override
public void shutdown() {
}
}
} }