diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index ca481e5691..3af58ef93c 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -216,7 +216,6 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { List addresses = new ArrayList<>(); Map priorityChildConfigs = new HashMap<>(); List priorities = new ArrayList<>(); // totally ordered priority list - Map localityWeights = new HashMap<>(); Status endpointNotFound = Status.OK; for (String cluster : clusters) { @@ -229,7 +228,6 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { addresses.addAll(state.result.addresses); priorityChildConfigs.putAll(state.result.priorityChildConfigs); priorities.addAll(state.result.priorities); - localityWeights.putAll(state.result.localityWeights); } else { endpointNotFound = state.status; } @@ -260,9 +258,6 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { resolvedAddresses.toBuilder() .setLoadBalancingPolicyConfig(childConfig) .setAddresses(Collections.unmodifiableList(addresses)) - .setAttributes(resolvedAddresses.getAttributes().toBuilder() - .set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS, - Collections.unmodifiableMap(localityWeights)).build()) .build()); } @@ -396,7 +391,6 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { } Map localityLbEndpoints = update.localityLbEndpointsMap; - Map localityWeights = new HashMap<>(); List dropOverloads = update.dropPolicies; List addresses = new ArrayList<>(); Map> prioritizedLocalityWeights = new HashMap<>(); @@ -415,6 +409,8 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { Attributes attr = endpoint.eag().getAttributes().toBuilder() .set(InternalXdsAttributes.ATTR_LOCALITY, locality) + .set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT, + localityLbInfo.localityWeight()) .set(InternalXdsAttributes.ATTR_SERVER_WEIGHT, weight) .build(); EquivalentAddressGroup eag = new EquivalentAddressGroup( @@ -429,7 +425,6 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { "Discard locality {0} with 0 healthy endpoints", locality); continue; } - localityWeights.put(locality, localityLbInfo.localityWeight()); if (!prioritizedLocalityWeights.containsKey(priorityName)) { prioritizedLocalityWeights.put(priorityName, new HashMap()); } @@ -450,7 +445,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { status = Status.OK; resolved = true; result = new ClusterResolutionResult(addresses, priorityChildConfigs, - sortedPriorityNames, localityWeights); + sortedPriorityNames); handleEndpointResourceUpdate(); } } @@ -690,23 +685,18 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { private final Map priorityChildConfigs; // List of priority names ordered in descending priorities. private final List priorities; - // Most recent view on how localities in the cluster should be wighted. Only set for EDS - // clusters that support the concept. - private final Map localityWeights; ClusterResolutionResult(List addresses, String priority, PriorityChildConfig config) { this(addresses, Collections.singletonMap(priority, config), - Collections.singletonList(priority), Collections.emptyMap()); + Collections.singletonList(priority)); } ClusterResolutionResult(List addresses, - Map configs, List priorities, - Map localityWeights) { + Map configs, List priorities) { this.addresses = addresses; this.priorityChildConfigs = configs; this.priorities = priorities; - this.localityWeights = localityWeights; } } diff --git a/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java b/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java index 448e5fbd25..bd21a8ac13 100644 --- a/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java +++ b/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java @@ -24,7 +24,6 @@ import io.grpc.NameResolver; import io.grpc.internal.ObjectPool; import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider; import io.grpc.xds.internal.security.SslContextProviderSupplier; -import java.util.Map; /** * Internal attributes used for xDS implementation. Do not use. @@ -58,8 +57,8 @@ public final class InternalXdsAttributes { * Map from localities to their weights. */ @NameResolver.ResolutionResultAttr - static final Attributes.Key> ATTR_LOCALITY_WEIGHTS = - Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.localityWeights"); + static final Attributes.Key ATTR_LOCALITY_WEIGHT = + Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.localityWeight"); /** * Name of the cluster that provides this EquivalentAddressGroup. diff --git a/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java index a961db02cc..b919649262 100644 --- a/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java @@ -21,6 +21,8 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME; import com.google.common.base.MoreObjects; +import io.grpc.Attributes; +import io.grpc.EquivalentAddressGroup; import io.grpc.InternalLogId; import io.grpc.LoadBalancer; import io.grpc.LoadBalancerRegistry; @@ -68,15 +70,33 @@ final class WrrLocalityLoadBalancer extends LoadBalancer { // to produce the weighted target LB config. WrrLocalityConfig wrrLocalityConfig = (WrrLocalityConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); - Map localityWeights = resolvedAddresses.getAttributes() - .get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS); - // Not having locality weights is a misconfiguration, and we have to return with an error. - if (localityWeights == null) { - Status unavailable = - Status.UNAVAILABLE.withDescription("wrr_locality error: no locality weights provided"); - helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable)); - return false; + // A map of locality weights is built up from the locality weight attributes in each address. + Map localityWeights = new HashMap<>(); + for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) { + Attributes eagAttrs = eag.getAttributes(); + Locality locality = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY); + Integer localityWeight = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT); + + if (locality == null) { + helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker( + Status.UNAVAILABLE.withDescription("wrr_locality error: no locality provided"))); + return false; + } + if (localityWeight == null) { + helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker( + Status.UNAVAILABLE.withDescription( + "wrr_locality error: no weight provided for locality " + locality))); + return false; + } + + if (!localityWeights.containsKey(locality)) { + localityWeights.put(locality, localityWeight); + } else if (!localityWeights.get(locality).equals(localityWeight)) { + logger.log(XdsLogLevel.WARNING, + "Locality {0} has both weights {1} and {2}, using weight {1}", locality, + localityWeights.get(locality), localityWeight); + } } // Weighted target LB expects a WeightedPolicySelection for each locality as it will create a @@ -88,14 +108,6 @@ final class WrrLocalityLoadBalancer extends LoadBalancer { wrrLocalityConfig.childPolicy)); } - // Remove the locality weights attribute now that we have consumed it. This is done simply for - // ease of debugging for the unsupported (and unlikely) scenario where WrrLocalityConfig has - // another wrr_locality as the child policy. The missing locality weight attribute would make - // the child wrr_locality fail early. - resolvedAddresses = resolvedAddresses.toBuilder() - .setAttributes(resolvedAddresses.getAttributes().toBuilder() - .discard(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS).build()).build(); - switchLb.switchTo(lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME)); switchLb.handleResolvedAddresses( resolvedAddresses.toBuilder() diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index b4b709d2ae..4d58f88e0e 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -327,8 +327,8 @@ public class ClusterResolverLoadBalancerTest { "least_request_experimental"); assertThat( - childBalancer.attributes.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).containsEntry( - locality1, 100); + childBalancer.addresses.get(0).getAttributes() + .get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT)).isEqualTo(100); } @Test @@ -410,8 +410,8 @@ public class ClusterResolverLoadBalancerTest { "least_request_experimental"); assertThat( - childBalancer.attributes.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).containsEntry( - locality1, 100); + childBalancer.addresses.get(0).getAttributes() + .get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT)).isEqualTo(100); } @@ -507,11 +507,20 @@ public class ClusterResolverLoadBalancerTest { assertThat(wrrLocalityConfig3.childPolicy.getProvider().getPolicyName()).isEqualTo( "round_robin"); - Map localityWeights = childBalancer.attributes.get( - InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS); - assertThat(localityWeights).containsEntry(locality1, 70); - assertThat(localityWeights).containsEntry(locality2, 10); - assertThat(localityWeights).containsEntry(locality3, 20); + for (EquivalentAddressGroup eag : childBalancer.addresses) { + if (eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY) == locality1) { + assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT)) + .isEqualTo(70); + } + if (eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY) == locality2) { + assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT)) + .isEqualTo(10); + } + if (eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY) == locality3) { + assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT)) + .isEqualTo(20); + } + } } @SuppressWarnings("unchecked") @@ -687,9 +696,9 @@ public class ClusterResolverLoadBalancerTest { ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2)); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); - Map localityWeights = childBalancer.attributes.get( - InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS); - assertThat(localityWeights.keySet()).containsExactly(locality2); + for (EquivalentAddressGroup eag : childBalancer.addresses) { + assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY)).isEqualTo(locality2); + } } @Test @@ -1315,7 +1324,6 @@ public class ClusterResolverLoadBalancerTest { private final Helper helper; private List addresses; private Object config; - private Attributes attributes; private Status upstreamError; private boolean shutdown; @@ -1328,7 +1336,6 @@ public class ClusterResolverLoadBalancerTest { public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { addresses = resolvedAddresses.getAddresses(); config = resolvedAddresses.getLoadBalancingPolicyConfig(); - attributes = resolvedAddresses.getAttributes(); return true; } diff --git a/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java index 29777a5284..344876aa34 100644 --- a/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java @@ -25,7 +25,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.testing.EqualsTester; import io.grpc.Attributes; import io.grpc.ConnectivityState; @@ -44,7 +43,9 @@ import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig; import io.grpc.xds.WrrLocalityLoadBalancer.WrrLocalityConfig; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.net.SocketAddress; -import java.util.Map; +import java.util.Collections; +import java.util.List; +import java.util.Objects; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -74,8 +75,6 @@ public class WrrLocalityLoadBalancerTest { private LoadBalancer mockChildLb; @Mock private Helper mockHelper; - @Mock - private SocketAddress mockSocketAddress; @Captor private ArgumentCaptor resolvedAddressesCaptor; @@ -84,8 +83,6 @@ public class WrrLocalityLoadBalancerTest { @Captor private ArgumentCaptor errorPickerCaptor; - private final EquivalentAddressGroup eag = new EquivalentAddressGroup(mockSocketAddress); - private WrrLocalityLoadBalancer loadBalancer; private LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry(); @@ -124,8 +121,10 @@ public class WrrLocalityLoadBalancerTest { // The child config is delivered wrapped in the wrr_locality config and the locality weights // in a ResolvedAddresses attribute. WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy); - Map localityWeights = ImmutableMap.of(localityOne, 1, localityTwo, 2); - deliverAddresses(wlConfig, localityWeights); + deliverAddresses(wlConfig, + ImmutableList.of( + makeAddress("addr1", localityOne, 1), + makeAddress("addr2", localityTwo, 2))); // Assert that the child policy and the locality weights were correctly mapped to a // WeightedTargetConfig. @@ -148,7 +147,8 @@ public class WrrLocalityLoadBalancerTest { // The child config is delivered wrapped in the wrr_locality config and the locality weights // in a ResolvedAddresses attribute. WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy); - deliverAddresses(wlConfig, null); + deliverAddresses(wlConfig, ImmutableList.of( + makeAddress("addr", Locality.create("test-region", "test-zone", "test-subzone"), null))); // With no locality weights, we should get a TRANSIENT_FAILURE. verify(mockHelper).getAuthority(); @@ -170,8 +170,8 @@ public class WrrLocalityLoadBalancerTest { @Test public void handleNameResolutionError_withChildLb() { deliverAddresses(new WrrLocalityConfig(new PolicySelection(mockChildProvider, null)), - ImmutableMap.of( - Locality.create("region", "zone", "subzone"), 1)); + ImmutableList.of( + makeAddress("addr1", Locality.create("test-region1", "test-zone", "test-subzone"), 1))); loadBalancer.handleNameResolutionError(Status.DEADLINE_EXCEEDED); verify(mockHelper, never()).updateBalancingState(isA(ConnectivityState.class), @@ -181,25 +181,25 @@ public class WrrLocalityLoadBalancerTest { @Test public void localityWeightAttributeNotPropagated() { - Locality locality = Locality.create("region1", "zone1", "subzone1"); PolicySelection childPolicy = new PolicySelection(mockChildProvider, null); WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy); - Map localityWeights = ImmutableMap.of(locality, 1); - deliverAddresses(wlConfig, localityWeights); + deliverAddresses(wlConfig, ImmutableList.of( + makeAddress("addr1", Locality.create("test-region1", "test-zone", "test-subzone"), 1))); // Assert that the child policy and the locality weights were correctly mapped to a // WeightedTargetConfig. verify(mockWeightedTargetLb).handleResolvedAddresses(resolvedAddressesCaptor.capture()); - assertThat(resolvedAddressesCaptor.getValue().getAttributes() - .get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).isNull(); + + //assertThat(resolvedAddressesCaptor.getValue().getAttributes() + // .get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).isNull(); } @Test public void shutdown() { deliverAddresses(new WrrLocalityConfig(new PolicySelection(mockChildProvider, null)), - ImmutableMap.of( - Locality.create("region", "zone", "subzone"), 1)); + ImmutableList.of( + makeAddress("addr", Locality.create("test-region", "test-zone", "test-subzone"), 1))); loadBalancer.shutdown(); verify(mockWeightedTargetLb).shutdown(); @@ -218,11 +218,55 @@ public class WrrLocalityLoadBalancerTest { .testEquals(); } - private void deliverAddresses(WrrLocalityConfig config, Map localityWeights) { + private void deliverAddresses(WrrLocalityConfig config, List addresses) { loadBalancer.handleResolvedAddresses( - ResolvedAddresses.newBuilder().setAddresses(ImmutableList.of(eag)).setAttributes( - Attributes.newBuilder() - .set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS, localityWeights).build()) - .setLoadBalancingPolicyConfig(config).build()); + ResolvedAddresses.newBuilder().setAddresses(addresses).setLoadBalancingPolicyConfig(config) + .build()); + } + + /** + * Create a locality-labeled address. + */ + private static EquivalentAddressGroup makeAddress(final String name, Locality locality, + Integer localityWeight) { + class FakeSocketAddress extends SocketAddress { + private final String name; + + private FakeSocketAddress(String name) { + this.name = name; + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof FakeSocketAddress)) { + return false; + } + FakeSocketAddress that = (FakeSocketAddress) o; + return Objects.equals(name, that.name); + } + + @Override + public String toString() { + return name; + } + } + + Attributes.Builder attrBuilder = Attributes.newBuilder() + .set(InternalXdsAttributes.ATTR_LOCALITY, locality); + if (localityWeight != null) { + attrBuilder.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT, localityWeight); + } + + EquivalentAddressGroup eag = new EquivalentAddressGroup(new FakeSocketAddress(name), + attrBuilder.build()); + return AddressFilter.setPathFilter(eag, Collections.singletonList(locality.toString())); } }