xds: Delay priority LB updates from children (#9670)

If a child policy triggers an update to the parent priority policy
it will be ignored if an update is already in process.

This is the second attempt to make this change, the first one caused a
problem with the ring hash LB. A new test that uses actual control plane
and data plane servers is now included to prove the issue no longer
appears.
This commit is contained in:
Terry Wilson 2022-11-04 09:17:17 -07:00 committed by GitHub
parent ba182c3e02
commit 0d44203bdc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 133 additions and 27 deletions

View File

@ -58,7 +58,7 @@ final class PriorityLoadBalancer extends LoadBalancer {
private final XdsLogger logger; private final XdsLogger logger;
// Includes all active and deactivated children. Mutable. New entries are only added from priority // Includes all active and deactivated children. Mutable. New entries are only added from priority
// 0 up to the selected priority. An entry is only deleted 15 minutes after the its deactivation. // 0 up to the selected priority. An entry is only deleted 15 minutes after its deactivation.
private final Map<String, ChildLbState> children = new HashMap<>(); private final Map<String, ChildLbState> children = new HashMap<>();
// Following fields are only null initially. // Following fields are only null initially.
@ -70,6 +70,8 @@ final class PriorityLoadBalancer extends LoadBalancer {
@Nullable private String currentPriority; @Nullable private String currentPriority;
private ConnectivityState currentConnectivityState; private ConnectivityState currentConnectivityState;
private SubchannelPicker currentPicker; private SubchannelPicker currentPicker;
// Set to true if currently in the process of handling resolved addresses.
private boolean handlingResolvedAddresses;
PriorityLoadBalancer(Helper helper) { PriorityLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper"); this.helper = checkNotNull(helper, "helper");
@ -94,11 +96,13 @@ final class PriorityLoadBalancer extends LoadBalancer {
children.get(priority).deactivate(); children.get(priority).deactivate();
} }
} }
handlingResolvedAddresses = true;
for (String priority : priorityNames) { for (String priority : priorityNames) {
if (children.containsKey(priority)) { if (children.containsKey(priority)) {
children.get(priority).updateResolvedAddresses(); children.get(priority).updateResolvedAddresses();
} }
} }
handlingResolvedAddresses = false;
tryNextPriority(); tryNextPriority();
return true; return true;
} }
@ -134,8 +138,11 @@ final class PriorityLoadBalancer extends LoadBalancer {
ChildLbState child = ChildLbState child =
new ChildLbState(priority, priorityConfigs.get(priority).ignoreReresolution); new ChildLbState(priority, priorityConfigs.get(priority).ignoreReresolution);
children.put(priority, child); children.put(priority, child);
child.updateResolvedAddresses();
updateOverallState(priority, CONNECTING, BUFFER_PICKER); updateOverallState(priority, CONNECTING, BUFFER_PICKER);
// Calling the child's updateResolvedAddresses() can result in tryNextPriority() being
// called recursively. We need to be sure to be done with processing here before it is
// called.
child.updateResolvedAddresses();
return; // Give priority i time to connect. return; // Give priority i time to connect.
} }
ChildLbState child = children.get(priority); ChildLbState child = children.get(priority);
@ -298,14 +305,12 @@ final class PriorityLoadBalancer extends LoadBalancer {
@Override @Override
public void updateBalancingState(final ConnectivityState newState, public void updateBalancingState(final ConnectivityState newState,
final SubchannelPicker newPicker) { final SubchannelPicker newPicker) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!children.containsKey(priority)) { if (!children.containsKey(priority)) {
return; return;
} }
connectivityState = newState; connectivityState = newState;
picker = newPicker; picker = newPicker;
if (deletionTimer != null && deletionTimer.isPending()) { if (deletionTimer != null && deletionTimer.isPending()) {
return; return;
} }
@ -321,9 +326,12 @@ final class PriorityLoadBalancer extends LoadBalancer {
seenReadyOrIdleSinceTransientFailure = false; seenReadyOrIdleSinceTransientFailure = false;
failOverTimer.cancel(); failOverTimer.cancel();
} }
// If we are currently handling newly resolved addresses, let's not try to reconfigure as
// the address handling process will take care of that to provide an atomic config update.
if (!handlingResolvedAddresses) {
tryNextPriority(); tryNextPriority();
} }
});
} }
@Override @Override

View File

@ -24,6 +24,7 @@ import com.github.xds.type.v3.TypedStruct;
import com.google.protobuf.Any; import com.google.protobuf.Any;
import com.google.protobuf.Struct; import com.google.protobuf.Struct;
import com.google.protobuf.Value; import com.google.protobuf.Value;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy;
import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy; import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy;
import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy.Policy; import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy.Policy;
import io.envoyproxy.envoy.config.core.v3.TypedExtensionConfig; import io.envoyproxy.envoy.config.core.v3.TypedExtensionConfig;
@ -151,4 +152,24 @@ public class FakeControlPlaneXdsIntegrationTest {
}; };
} }
} }
/**
* Basic test to make sure RING_HASH configuration works.
*/
@Test
public void pingPong_ringHash() {
controlPlane.setCdsConfig(
ControlPlaneRule.buildCluster().toBuilder()
.setLbPolicy(LbPolicy.RING_HASH).build());
ManagedChannel channel = dataPlane.getManagedChannel();
SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub(
channel);
SimpleRequest request = SimpleRequest.newBuilder()
.build();
SimpleResponse goldenResponse = SimpleResponse.newBuilder()
.setResponseMessage("Hi, xDS!")
.build();
assertEquals(goldenResponse, blockingStub.unaryRpc(request));
}
} }

View File

@ -22,6 +22,7 @@ import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA; import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
@ -686,6 +687,37 @@ public class PriorityLoadBalancerTest {
verifyNoMoreInteractions(helper); verifyNoMoreInteractions(helper);
} }
@Test
public void noDuplicateOverallBalancingStateUpdate() {
FakeLoadBalancerProvider fakeLbProvider = new FakeLoadBalancerProvider();
PriorityChildConfig priorityChildConfig0 =
new PriorityChildConfig(new PolicySelection(fakeLbProvider, new Object()), true);
PriorityChildConfig priorityChildConfig1 =
new PriorityChildConfig(new PolicySelection(fakeLbProvider, new Object()), false);
PriorityLbConfig priorityLbConfig =
new PriorityLbConfig(
ImmutableMap.of("p0", priorityChildConfig0),
ImmutableList.of("p0"));
priorityLb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
.build());
priorityLbConfig =
new PriorityLbConfig(
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1),
ImmutableList.of("p0", "p1"));
priorityLb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
.build());
verify(helper, times(6)).updateBalancingState(any(), any());
}
private void assertLatestConnectivityState(ConnectivityState expectedState) { private void assertLatestConnectivityState(ConnectivityState expectedState) {
verify(helper, atLeastOnce()) verify(helper, atLeastOnce())
.updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture()); .updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture());
@ -714,4 +746,49 @@ public class PriorityLoadBalancerTest {
PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(pickResult).isEqualTo(PickResult.withNoResult()); assertThat(pickResult).isEqualTo(PickResult.withNoResult());
} }
private static class FakeLoadBalancerProvider extends LoadBalancerProvider {
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 5;
}
@Override
public String getPolicyName() {
return "foo";
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return new FakeLoadBalancer(helper);
}
}
static class FakeLoadBalancer extends LoadBalancer {
private Helper helper;
FakeLoadBalancer(Helper helper) {
this.helper = helper;
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL));
}
@Override
public void handleNameResolutionError(Status error) {
}
@Override
public void shutdown() {
}
}
} }