xds: Support localities in multiple priorities (#9683)

Additional logic to support for the same locality appearing under
multiple priorities.
This commit is contained in:
Terry Wilson 2022-11-29 13:15:28 -08:00 committed by GitHub
parent 548d3cac38
commit 5cf54f3178
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 123 additions and 71 deletions

View File

@ -216,7 +216,6 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
List<EquivalentAddressGroup> addresses = new ArrayList<>();
Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
List<String> priorities = new ArrayList<>(); // totally ordered priority list
Map<Locality, Integer> localityWeights = new HashMap<>();
Status endpointNotFound = Status.OK;
for (String cluster : clusters) {
@ -229,7 +228,6 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
addresses.addAll(state.result.addresses);
priorityChildConfigs.putAll(state.result.priorityChildConfigs);
priorities.addAll(state.result.priorities);
localityWeights.putAll(state.result.localityWeights);
} else {
endpointNotFound = state.status;
}
@ -260,9 +258,6 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
resolvedAddresses.toBuilder()
.setLoadBalancingPolicyConfig(childConfig)
.setAddresses(Collections.unmodifiableList(addresses))
.setAttributes(resolvedAddresses.getAttributes().toBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS,
Collections.unmodifiableMap(localityWeights)).build())
.build());
}
@ -396,7 +391,6 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
}
Map<Locality, LocalityLbEndpoints> localityLbEndpoints =
update.localityLbEndpointsMap;
Map<Locality, Integer> localityWeights = new HashMap<>();
List<DropOverload> dropOverloads = update.dropPolicies;
List<EquivalentAddressGroup> addresses = new ArrayList<>();
Map<String, Map<Locality, Integer>> prioritizedLocalityWeights = new HashMap<>();
@ -415,6 +409,8 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
Attributes attr =
endpoint.eag().getAttributes().toBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY, locality)
.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT,
localityLbInfo.localityWeight())
.set(InternalXdsAttributes.ATTR_SERVER_WEIGHT, weight)
.build();
EquivalentAddressGroup eag = new EquivalentAddressGroup(
@ -429,7 +425,6 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
"Discard locality {0} with 0 healthy endpoints", locality);
continue;
}
localityWeights.put(locality, localityLbInfo.localityWeight());
if (!prioritizedLocalityWeights.containsKey(priorityName)) {
prioritizedLocalityWeights.put(priorityName, new HashMap<Locality, Integer>());
}
@ -450,7 +445,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
status = Status.OK;
resolved = true;
result = new ClusterResolutionResult(addresses, priorityChildConfigs,
sortedPriorityNames, localityWeights);
sortedPriorityNames);
handleEndpointResourceUpdate();
}
}
@ -690,23 +685,18 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
private final Map<String, PriorityChildConfig> priorityChildConfigs;
// List of priority names ordered in descending priorities.
private final List<String> priorities;
// Most recent view on how localities in the cluster should be wighted. Only set for EDS
// clusters that support the concept.
private final Map<Locality, Integer> localityWeights;
ClusterResolutionResult(List<EquivalentAddressGroup> addresses, String priority,
PriorityChildConfig config) {
this(addresses, Collections.singletonMap(priority, config),
Collections.singletonList(priority), Collections.emptyMap());
Collections.singletonList(priority));
}
ClusterResolutionResult(List<EquivalentAddressGroup> addresses,
Map<String, PriorityChildConfig> configs, List<String> priorities,
Map<Locality, Integer> localityWeights) {
Map<String, PriorityChildConfig> configs, List<String> priorities) {
this.addresses = addresses;
this.priorityChildConfigs = configs;
this.priorities = priorities;
this.localityWeights = localityWeights;
}
}

View File

@ -24,7 +24,6 @@ import io.grpc.NameResolver;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
import io.grpc.xds.internal.security.SslContextProviderSupplier;
import java.util.Map;
/**
* Internal attributes used for xDS implementation. Do not use.
@ -58,8 +57,8 @@ public final class InternalXdsAttributes {
* Map from localities to their weights.
*/
@NameResolver.ResolutionResultAttr
static final Attributes.Key<Map<Locality, Integer>> ATTR_LOCALITY_WEIGHTS =
Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.localityWeights");
static final Attributes.Key<Integer> ATTR_LOCALITY_WEIGHT =
Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.localityWeight");
/**
* Name of the cluster that provides this EquivalentAddressGroup.

View File

@ -21,6 +21,8 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;
import com.google.common.base.MoreObjects;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerRegistry;
@ -68,16 +70,34 @@ final class WrrLocalityLoadBalancer extends LoadBalancer {
// to produce the weighted target LB config.
WrrLocalityConfig wrrLocalityConfig
= (WrrLocalityConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
Map<Locality, Integer> localityWeights = resolvedAddresses.getAttributes()
.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS);
// Not having locality weights is a misconfiguration, and we have to return with an error.
if (localityWeights == null) {
Status unavailable =
Status.UNAVAILABLE.withDescription("wrr_locality error: no locality weights provided");
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable));
// A map of locality weights is built up from the locality weight attributes in each address.
Map<Locality, Integer> localityWeights = new HashMap<>();
for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) {
Attributes eagAttrs = eag.getAttributes();
Locality locality = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY);
Integer localityWeight = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT);
if (locality == null) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(
Status.UNAVAILABLE.withDescription("wrr_locality error: no locality provided")));
return false;
}
if (localityWeight == null) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(
Status.UNAVAILABLE.withDescription(
"wrr_locality error: no weight provided for locality " + locality)));
return false;
}
if (!localityWeights.containsKey(locality)) {
localityWeights.put(locality, localityWeight);
} else if (!localityWeights.get(locality).equals(localityWeight)) {
logger.log(XdsLogLevel.WARNING,
"Locality {0} has both weights {1} and {2}, using weight {1}", locality,
localityWeights.get(locality), localityWeight);
}
}
// Weighted target LB expects a WeightedPolicySelection for each locality as it will create a
// child LB for each.
@ -88,14 +108,6 @@ final class WrrLocalityLoadBalancer extends LoadBalancer {
wrrLocalityConfig.childPolicy));
}
// Remove the locality weights attribute now that we have consumed it. This is done simply for
// ease of debugging for the unsupported (and unlikely) scenario where WrrLocalityConfig has
// another wrr_locality as the child policy. The missing locality weight attribute would make
// the child wrr_locality fail early.
resolvedAddresses = resolvedAddresses.toBuilder()
.setAttributes(resolvedAddresses.getAttributes().toBuilder()
.discard(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS).build()).build();
switchLb.switchTo(lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME));
switchLb.handleResolvedAddresses(
resolvedAddresses.toBuilder()

View File

@ -327,8 +327,8 @@ public class ClusterResolverLoadBalancerTest {
"least_request_experimental");
assertThat(
childBalancer.attributes.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).containsEntry(
locality1, 100);
childBalancer.addresses.get(0).getAttributes()
.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT)).isEqualTo(100);
}
@Test
@ -410,8 +410,8 @@ public class ClusterResolverLoadBalancerTest {
"least_request_experimental");
assertThat(
childBalancer.attributes.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).containsEntry(
locality1, 100);
childBalancer.addresses.get(0).getAttributes()
.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT)).isEqualTo(100);
}
@ -507,11 +507,20 @@ public class ClusterResolverLoadBalancerTest {
assertThat(wrrLocalityConfig3.childPolicy.getProvider().getPolicyName()).isEqualTo(
"round_robin");
Map<Locality, Integer> localityWeights = childBalancer.attributes.get(
InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS);
assertThat(localityWeights).containsEntry(locality1, 70);
assertThat(localityWeights).containsEntry(locality2, 10);
assertThat(localityWeights).containsEntry(locality3, 20);
for (EquivalentAddressGroup eag : childBalancer.addresses) {
if (eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY) == locality1) {
assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT))
.isEqualTo(70);
}
if (eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY) == locality2) {
assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT))
.isEqualTo(10);
}
if (eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY) == locality3) {
assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT))
.isEqualTo(20);
}
}
}
@SuppressWarnings("unchecked")
@ -687,9 +696,9 @@ public class ClusterResolverLoadBalancerTest {
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
Map<Locality, Integer> localityWeights = childBalancer.attributes.get(
InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS);
assertThat(localityWeights.keySet()).containsExactly(locality2);
for (EquivalentAddressGroup eag : childBalancer.addresses) {
assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY)).isEqualTo(locality2);
}
}
@Test
@ -1315,7 +1324,6 @@ public class ClusterResolverLoadBalancerTest {
private final Helper helper;
private List<EquivalentAddressGroup> addresses;
private Object config;
private Attributes attributes;
private Status upstreamError;
private boolean shutdown;
@ -1328,7 +1336,6 @@ public class ClusterResolverLoadBalancerTest {
public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
addresses = resolvedAddresses.getAddresses();
config = resolvedAddresses.getLoadBalancingPolicyConfig();
attributes = resolvedAddresses.getAttributes();
return true;
}

View File

@ -25,7 +25,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.testing.EqualsTester;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
@ -44,7 +43,9 @@ import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig;
import io.grpc.xds.WrrLocalityLoadBalancer.WrrLocalityConfig;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -74,8 +75,6 @@ public class WrrLocalityLoadBalancerTest {
private LoadBalancer mockChildLb;
@Mock
private Helper mockHelper;
@Mock
private SocketAddress mockSocketAddress;
@Captor
private ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor;
@ -84,8 +83,6 @@ public class WrrLocalityLoadBalancerTest {
@Captor
private ArgumentCaptor<SubchannelPicker> errorPickerCaptor;
private final EquivalentAddressGroup eag = new EquivalentAddressGroup(mockSocketAddress);
private WrrLocalityLoadBalancer loadBalancer;
private LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry();
@ -124,8 +121,10 @@ public class WrrLocalityLoadBalancerTest {
// The child config is delivered wrapped in the wrr_locality config and the locality weights
// in a ResolvedAddresses attribute.
WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy);
Map<Locality, Integer> localityWeights = ImmutableMap.of(localityOne, 1, localityTwo, 2);
deliverAddresses(wlConfig, localityWeights);
deliverAddresses(wlConfig,
ImmutableList.of(
makeAddress("addr1", localityOne, 1),
makeAddress("addr2", localityTwo, 2)));
// Assert that the child policy and the locality weights were correctly mapped to a
// WeightedTargetConfig.
@ -148,7 +147,8 @@ public class WrrLocalityLoadBalancerTest {
// The child config is delivered wrapped in the wrr_locality config and the locality weights
// in a ResolvedAddresses attribute.
WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy);
deliverAddresses(wlConfig, null);
deliverAddresses(wlConfig, ImmutableList.of(
makeAddress("addr", Locality.create("test-region", "test-zone", "test-subzone"), null)));
// With no locality weights, we should get a TRANSIENT_FAILURE.
verify(mockHelper).getAuthority();
@ -170,8 +170,8 @@ public class WrrLocalityLoadBalancerTest {
@Test
public void handleNameResolutionError_withChildLb() {
deliverAddresses(new WrrLocalityConfig(new PolicySelection(mockChildProvider, null)),
ImmutableMap.of(
Locality.create("region", "zone", "subzone"), 1));
ImmutableList.of(
makeAddress("addr1", Locality.create("test-region1", "test-zone", "test-subzone"), 1)));
loadBalancer.handleNameResolutionError(Status.DEADLINE_EXCEEDED);
verify(mockHelper, never()).updateBalancingState(isA(ConnectivityState.class),
@ -181,25 +181,25 @@ public class WrrLocalityLoadBalancerTest {
@Test
public void localityWeightAttributeNotPropagated() {
Locality locality = Locality.create("region1", "zone1", "subzone1");
PolicySelection childPolicy = new PolicySelection(mockChildProvider, null);
WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy);
Map<Locality, Integer> localityWeights = ImmutableMap.of(locality, 1);
deliverAddresses(wlConfig, localityWeights);
deliverAddresses(wlConfig, ImmutableList.of(
makeAddress("addr1", Locality.create("test-region1", "test-zone", "test-subzone"), 1)));
// Assert that the child policy and the locality weights were correctly mapped to a
// WeightedTargetConfig.
verify(mockWeightedTargetLb).handleResolvedAddresses(resolvedAddressesCaptor.capture());
assertThat(resolvedAddressesCaptor.getValue().getAttributes()
.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).isNull();
//assertThat(resolvedAddressesCaptor.getValue().getAttributes()
// .get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).isNull();
}
@Test
public void shutdown() {
deliverAddresses(new WrrLocalityConfig(new PolicySelection(mockChildProvider, null)),
ImmutableMap.of(
Locality.create("region", "zone", "subzone"), 1));
ImmutableList.of(
makeAddress("addr", Locality.create("test-region", "test-zone", "test-subzone"), 1)));
loadBalancer.shutdown();
verify(mockWeightedTargetLb).shutdown();
@ -218,11 +218,55 @@ public class WrrLocalityLoadBalancerTest {
.testEquals();
}
private void deliverAddresses(WrrLocalityConfig config, Map<Locality, Integer> localityWeights) {
private void deliverAddresses(WrrLocalityConfig config, List<EquivalentAddressGroup> addresses) {
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(ImmutableList.of(eag)).setAttributes(
Attributes.newBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS, localityWeights).build())
.setLoadBalancingPolicyConfig(config).build());
ResolvedAddresses.newBuilder().setAddresses(addresses).setLoadBalancingPolicyConfig(config)
.build());
}
/**
* Create a locality-labeled address.
*/
private static EquivalentAddressGroup makeAddress(final String name, Locality locality,
Integer localityWeight) {
class FakeSocketAddress extends SocketAddress {
private final String name;
private FakeSocketAddress(String name) {
this.name = name;
}
@Override
public int hashCode() {
return Objects.hash(name);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof FakeSocketAddress)) {
return false;
}
FakeSocketAddress that = (FakeSocketAddress) o;
return Objects.equals(name, that.name);
}
@Override
public String toString() {
return name;
}
}
Attributes.Builder attrBuilder = Attributes.newBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY, locality);
if (localityWeight != null) {
attrBuilder.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT, localityWeight);
}
EquivalentAddressGroup eag = new EquivalentAddressGroup(new FakeSocketAddress(name),
attrBuilder.build());
return AddressFilter.setPathFilter(eag, Collections.singletonList(locality.toString()));
}
}