xds: Plumb locality in xds_cluster_impl and weighted_target

As part of gRFC A78:

> To support the locality label in the WRR metrics, we will extend the
> `weighted_target` LB policy (see A28) to define a resolver attribute
> that indicates the name of its child. This attribute will be passed
> down to each of its children with the appropriate value, so that any
> LB policy that sits underneath the `weighted_target` policy will be
> able to use it.

xds_cluster_impl is involved because it uses the child names in the
AddressFilter, which must match the names used by weighted_target.
Instead of using Locality.toString() in multiple policies and assuming
the policies agree, we now have xds_cluster_impl decide the locality's
name and pass it down explicitly. This allows us to change the name
format to match gRFC A78:

> If locality information is available, the value of this label will be
> of the form `{region="${REGION}", zone="${ZONE}",
> sub_zone="${SUB_ZONE}"}`, where `${REGION}`, `${ZONE}`, and
> `${SUB_ZONE}` are replaced with the actual values. If no locality
> information is available, the label will be set to the empty string.
This commit is contained in:
Eric Anderson 2024-04-27 07:52:53 -07:00
parent d6f1a9d569
commit 8f81bd2886
9 changed files with 115 additions and 48 deletions

View File

@ -76,6 +76,8 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
private static final Attributes.Key<ClusterLocalityStats> ATTR_CLUSTER_LOCALITY_STATS =
Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityStats");
private static final Attributes.Key<String> ATTR_CLUSTER_LOCALITY_NAME =
Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityName");
private final XdsLogger logger;
private final Helper helper;
@ -209,11 +211,14 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
List<EquivalentAddressGroup> addresses = withAdditionalAttributes(args.getAddresses());
Locality locality = args.getAddresses().get(0).getAttributes().get(
InternalXdsAttributes.ATTR_LOCALITY); // all addresses should be in the same locality
String localityName = args.getAddresses().get(0).getAttributes().get(
InternalXdsAttributes.ATTR_LOCALITY_NAME);
// Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
// attributes with its locality, including endpoints in LOGICAL_DNS clusters.
// In case of not (which really shouldn't), loads are aggregated under an empty locality.
if (locality == null) {
locality = Locality.create("", "", "");
localityName = "";
}
final ClusterLocalityStats localityStats =
(lrsServerInfo == null)
@ -221,8 +226,10 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
: xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
edsServiceName, locality);
Attributes attrs = args.getAttributes().toBuilder().set(
ATTR_CLUSTER_LOCALITY_STATS, localityStats).build();
Attributes attrs = args.getAttributes().toBuilder()
.set(ATTR_CLUSTER_LOCALITY_STATS, localityStats)
.set(ATTR_CLUSTER_LOCALITY_NAME, localityName)
.build();
args = args.toBuilder().setAddresses(addresses).setAttributes(attrs).build();
final Subchannel subchannel = delegate().createSubchannel(args);
@ -344,6 +351,10 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
final ClusterLocalityStats stats =
result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_STATS);
if (stats != null) {
String localityName =
result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_NAME);
args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName);
ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
stats, inFlights, result.getStreamTracerFactory());
ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()

View File

@ -412,17 +412,18 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
if (endpoint.loadBalancingWeight() != 0) {
weight *= endpoint.loadBalancingWeight();
}
String localityName = localityName(locality);
Attributes attr =
endpoint.eag().getAttributes().toBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY, locality)
.set(InternalXdsAttributes.ATTR_LOCALITY_NAME, localityName)
.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT,
localityLbInfo.localityWeight())
.set(InternalXdsAttributes.ATTR_SERVER_WEIGHT, weight)
.build();
EquivalentAddressGroup eag = new EquivalentAddressGroup(
endpoint.eag().getAddresses(), attr);
eag = AddressFilter.setPathFilter(
eag, Arrays.asList(priorityName, localityName(locality)));
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
addresses.add(eag);
}
}
@ -612,11 +613,13 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
// No weight attribute is attached, all endpoint-level LB policy should be able
// to handle such it.
Attributes attr = eag.getAttributes().toBuilder().set(
InternalXdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY).build();
String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY);
Attributes attr = eag.getAttributes().toBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY)
.set(InternalXdsAttributes.ATTR_LOCALITY_NAME, localityName)
.build();
eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
eag = AddressFilter.setPathFilter(
eag, Arrays.asList(priorityName, LOGICAL_DNS_CLUSTER_LOCALITY.toString()));
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
addresses.add(eag);
}
PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
@ -844,6 +847,9 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
* across all localities in all clusters.
*/
private static String localityName(Locality locality) {
return locality.toString();
return "{region=\"" + locality.region()
+ "\", zone=\"" + locality.zone()
+ "\", sub_zone=\"" + locality.subZone()
+ "\"}";
}
}

View File

@ -77,6 +77,13 @@ public final class InternalXdsAttributes {
static final Attributes.Key<Locality> ATTR_LOCALITY =
Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.locality");
/**
* The name of the locality that this EquivalentAddressGroup is in.
*/
@EquivalentAddressGroup.Attr
static final Attributes.Key<String> ATTR_LOCALITY_NAME =
Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.localityName");
/**
* Endpoint weight for load balancing purposes.
*/

View File

@ -23,6 +23,7 @@ import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import com.google.common.collect.ImmutableMap;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
@ -42,6 +43,8 @@ import javax.annotation.Nullable;
/** Load balancer for weighted_target policy. */
final class WeightedTargetLoadBalancer extends LoadBalancer {
public static final Attributes.Key<String> CHILD_NAME =
Attributes.Key.create("io.grpc.xds.WeightedTargetLoadBalancer.CHILD_NAME");
private final XdsLogger logger;
private final Map<String, GracefulSwitchLoadBalancer> childBalancers = new HashMap<>();
@ -95,6 +98,9 @@ final class WeightedTargetLoadBalancer extends LoadBalancer {
resolvedAddresses.toBuilder()
.setAddresses(AddressFilter.filter(resolvedAddresses.getAddresses(), targetName))
.setLoadBalancingPolicyConfig(targets.get(targetName).policySelection.getConfig())
.setAttributes(resolvedAddresses.getAttributes().toBuilder()
.set(CHILD_NAME, targetName)
.build())
.build());
}

View File

@ -31,7 +31,6 @@ import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig;
import io.grpc.xds.client.Locality;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.util.HashMap;
@ -73,10 +72,10 @@ final class WrrLocalityLoadBalancer extends LoadBalancer {
= (WrrLocalityConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
// A map of locality weights is built up from the locality weight attributes in each address.
Map<Locality, Integer> localityWeights = new HashMap<>();
Map<String, Integer> localityWeights = new HashMap<>();
for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) {
Attributes eagAttrs = eag.getAttributes();
Locality locality = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY);
String locality = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY_NAME);
Integer localityWeight = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT);
if (locality == null) {
@ -106,8 +105,8 @@ final class WrrLocalityLoadBalancer extends LoadBalancer {
// Weighted target LB expects a WeightedPolicySelection for each locality as it will create a
// child LB for each.
Map<String, WeightedPolicySelection> weightedPolicySelections = new HashMap<>();
for (Locality locality : localityWeights.keySet()) {
weightedPolicySelections.put(locality.toString(),
for (String locality : localityWeights.keySet()) {
weightedPolicySelections.put(locality,
new WeightedPolicySelection(localityWeights.get(locality),
wrrLocalityConfig.childPolicy));
}

View File

@ -19,12 +19,14 @@ package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.github.xds.data.orca.v3.OrcaLoadReport;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
@ -32,6 +34,7 @@ import io.grpc.InsecureChannelCredentials;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickDetailsConsumer;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
@ -45,8 +48,10 @@ import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.internal.FakeClock;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.PickSubchannelArgsImpl;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.testing.TestMethodDescriptors;
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.EnvoyServerProtoData.DownstreamTlsContext;
@ -141,6 +146,9 @@ public class ClusterImplLoadBalancerTest {
}
};
private final Helper helper = new FakeLbHelper();
private PickSubchannelArgs pickSubchannelArgs = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT,
new PickDetailsConsumer() {});
@Mock
private ThreadSafeRandom mockRandom;
private int xdsClientRefs;
@ -218,7 +226,7 @@ public class ClusterImplLoadBalancerTest {
public void nameResolutionError_beforeChildPolicyInstantiated_returnErrorPickerToUpstream() {
loadBalancer.handleNameResolutionError(Status.UNIMPLEMENTED.withDescription("not found"));
assertThat(currentState).isEqualTo(ConnectivityState.TRANSIENT_FAILURE);
PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs);
assertThat(result.getStatus().isOk()).isFalse();
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNIMPLEMENTED);
assertThat(result.getStatus().getDescription()).isEqualTo("not found");
@ -243,6 +251,32 @@ public class ClusterImplLoadBalancerTest {
.isEqualTo("cannot reach server");
}
@Test
public void pick_addsLocalityLabel() {
LoadBalancerProvider weightedTargetProvider = new WeightedTargetLoadBalancerProvider();
WeightedTargetConfig weightedTargetConfig =
buildWeightedTargetConfig(ImmutableMap.of(locality, 10));
ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
null, Collections.<DropOverload>emptyList(),
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null);
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers);
Subchannel subchannel = leafBalancer.helper.createSubchannel(
CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build());
leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY);
assertThat(currentState).isEqualTo(ConnectivityState.READY);
PickDetailsConsumer detailsConsumer = mock(PickDetailsConsumer.class);
pickSubchannelArgs = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT, detailsConsumer);
PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs);
assertThat(result.getStatus().isOk()).isTrue();
// The value will be determined by the parent policy, so can be different than the value used in
// makeAddress() for the test.
verify(detailsConsumer).addOptionalLabel("grpc.lb.locality", locality.toString());
}
@Test
public void recordLoadStats() {
LoadBalancerProvider weightedTargetProvider = new WeightedTargetLoadBalancerProvider();
@ -258,7 +292,7 @@ public class ClusterImplLoadBalancerTest {
CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build());
leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY);
assertThat(currentState).isEqualTo(ConnectivityState.READY);
PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs);
assertThat(result.getStatus().isOk()).isTrue();
ClientStreamTracer streamTracer1 = result.getStreamTracerFactory().newClientStreamTracer(
ClientStreamTracer.StreamInfo.newBuilder().build(), new Metadata()); // first RPC call
@ -347,7 +381,7 @@ public class ClusterImplLoadBalancerTest {
CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build());
leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY);
assertThat(currentState).isEqualTo(ConnectivityState.READY);
PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs);
assertThat(result.getStatus().isOk()).isFalse();
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription()).isEqualTo("Dropped: throttle");
@ -373,7 +407,7 @@ public class ClusterImplLoadBalancerTest {
.build())
.setLoadBalancingPolicyConfig(config)
.build());
result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
result = currentPicker.pickSubchannel(pickSubchannelArgs);
assertThat(result.getStatus().isOk()).isFalse();
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription()).isEqualTo("Dropped: lb");
@ -386,7 +420,7 @@ public class ClusterImplLoadBalancerTest {
.isEqualTo(1L);
assertThat(clusterStats.totalDroppedRequests()).isEqualTo(1L);
result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
result = currentPicker.pickSubchannel(pickSubchannelArgs);
assertThat(result.getStatus().isOk()).isTrue();
}
@ -423,7 +457,7 @@ public class ClusterImplLoadBalancerTest {
leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY);
assertThat(currentState).isEqualTo(ConnectivityState.READY);
for (int i = 0; i < maxConcurrentRequests; i++) {
PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs);
assertThat(result.getStatus().isOk()).isTrue();
ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory();
streamTracerFactory.newClientStreamTracer(
@ -434,7 +468,7 @@ public class ClusterImplLoadBalancerTest {
assertThat(clusterStats.clusterServiceName()).isEqualTo(EDS_SERVICE_NAME);
assertThat(clusterStats.totalDroppedRequests()).isEqualTo(0L);
PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs);
clusterStats = Iterables.getOnlyElement(loadStatsManager.getClusterStatsReports(CLUSTER));
assertThat(clusterStats.clusterServiceName()).isEqualTo(EDS_SERVICE_NAME);
if (enableCircuitBreaking) {
@ -455,7 +489,7 @@ public class ClusterImplLoadBalancerTest {
new PolicySelection(weightedTargetProvider, weightedTargetConfig), null);
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
result = currentPicker.pickSubchannel(pickSubchannelArgs);
assertThat(result.getStatus().isOk()).isTrue();
result.getStreamTracerFactory().newClientStreamTracer(
ClientStreamTracer.StreamInfo.newBuilder().build(), new Metadata()); // 101th request
@ -463,7 +497,7 @@ public class ClusterImplLoadBalancerTest {
assertThat(clusterStats.clusterServiceName()).isEqualTo(EDS_SERVICE_NAME);
assertThat(clusterStats.totalDroppedRequests()).isEqualTo(0L);
result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); // 102th request
result = currentPicker.pickSubchannel(pickSubchannelArgs); // 102th request
clusterStats = Iterables.getOnlyElement(loadStatsManager.getClusterStatsReports(CLUSTER));
assertThat(clusterStats.clusterServiceName()).isEqualTo(EDS_SERVICE_NAME);
if (enableCircuitBreaking) {
@ -511,7 +545,7 @@ public class ClusterImplLoadBalancerTest {
leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY);
assertThat(currentState).isEqualTo(ConnectivityState.READY);
for (int i = 0; i < ClusterImplLoadBalancer.DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS; i++) {
PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs);
assertThat(result.getStatus().isOk()).isTrue();
ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory();
streamTracerFactory.newClientStreamTracer(
@ -522,7 +556,7 @@ public class ClusterImplLoadBalancerTest {
assertThat(clusterStats.clusterServiceName()).isEqualTo(EDS_SERVICE_NAME);
assertThat(clusterStats.totalDroppedRequests()).isEqualTo(0L);
PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs);
clusterStats = Iterables.getOnlyElement(loadStatsManager.getClusterStatsReports(CLUSTER));
assertThat(clusterStats.clusterServiceName()).isEqualTo(EDS_SERVICE_NAME);
if (enableCircuitBreaking) {
@ -697,7 +731,11 @@ public class ClusterImplLoadBalancerTest {
}
EquivalentAddressGroup eag = new EquivalentAddressGroup(new FakeSocketAddress(name),
Attributes.newBuilder().set(InternalXdsAttributes.ATTR_LOCALITY, locality).build());
Attributes.newBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY, locality)
// Unique but arbitrary string
.set(InternalXdsAttributes.ATTR_LOCALITY_NAME, locality.toString())
.build());
return AddressFilter.setPathFilter(eag, Collections.singletonList(locality.toString()));
}

View File

@ -21,6 +21,7 @@ import static io.grpc.xds.XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.WRR_LOCALITY_POLICY_NAME;
import static java.util.stream.Collectors.toList;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@ -909,11 +910,12 @@ public class ClusterResolverLoadBalancerTest {
assertAddressesEqual(Arrays.asList(endpoint3, endpoint1, endpoint2),
childBalancer.addresses); // ordered by cluster then addresses
assertAddressesEqual(AddressFilter.filter(AddressFilter.filter(
childBalancer.addresses, CLUSTER1 + "[child1]"), locality1.toString()),
childBalancer.addresses, CLUSTER1 + "[child1]"),
"{region=\"test-region-1\", zone=\"test-zone-1\", sub_zone=\"test-subzone-1\"}"),
Collections.singletonList(endpoint3));
assertAddressesEqual(AddressFilter.filter(AddressFilter.filter(
childBalancer.addresses, CLUSTER_DNS + "[child0]"),
Locality.create("", "", "").toString()),
"{region=\"\", zone=\"\", sub_zone=\"\"}"),
Arrays.asList(endpoint1, endpoint2));
}
@ -1142,10 +1144,11 @@ public class ClusterResolverLoadBalancerTest {
/** Asserts two list of EAGs contains same addresses, regardless of attributes. */
private static void assertAddressesEqual(
List<EquivalentAddressGroup> expected, List<EquivalentAddressGroup> actual) {
assertThat(actual.size()).isEqualTo(expected.size());
for (int i = 0; i < actual.size(); i++) {
assertThat(actual.get(i).getAddresses()).isEqualTo(expected.get(i).getAddresses());
}
List<List<SocketAddress>> expectedAddresses
= expected.stream().map(EquivalentAddressGroup::getAddresses).collect(toList());
List<List<SocketAddress>> actualAddresses
= actual.stream().map(EquivalentAddressGroup::getAddresses).collect(toList());
assertThat(actualAddresses).isEqualTo(expectedAddresses);
}
private static EquivalentAddressGroup makeAddress(final String name) {

View File

@ -220,6 +220,8 @@ public class WeightedTargetLoadBalancerTest {
ResolvedAddresses resolvedAddresses = resolvedAddressesCaptor.getValue();
assertThat(resolvedAddresses.getLoadBalancingPolicyConfig()).isEqualTo(configs[i]);
assertThat(resolvedAddresses.getAttributes().get(fakeKey)).isEqualTo(fakeValue);
assertThat(resolvedAddresses.getAttributes().get(WeightedTargetLoadBalancer.CHILD_NAME))
.isEqualTo("target" + i);
assertThat(Iterables.getOnlyElement(resolvedAddresses.getAddresses()).getAddresses())
.containsExactly(socketAddresses[i]);
}

View File

@ -42,7 +42,6 @@ import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig;
import io.grpc.xds.WrrLocalityLoadBalancer.WrrLocalityConfig;
import io.grpc.xds.client.Locality;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
@ -111,8 +110,8 @@ public class WrrLocalityLoadBalancerTest {
@Test
public void handleResolvedAddresses() {
// A two locality cluster with a mock child LB policy.
Locality localityOne = Locality.create("region1", "zone1", "subzone1");
Locality localityTwo = Locality.create("region2", "zone2", "subzone2");
String localityOne = "localityOne";
String localityTwo = "localityTwo";
PolicySelection childPolicy = new PolicySelection(mockChildProvider, null);
// The child config is delivered wrapped in the wrr_locality config and the locality weights
@ -130,9 +129,9 @@ public class WrrLocalityLoadBalancerTest {
assertThat(config).isInstanceOf(WeightedTargetConfig.class);
WeightedTargetConfig wtConfig = (WeightedTargetConfig) config;
assertThat(wtConfig.targets).hasSize(2);
assertThat(wtConfig.targets).containsEntry(localityOne.toString(),
assertThat(wtConfig.targets).containsEntry(localityOne,
new WeightedPolicySelection(1, childPolicy));
assertThat(wtConfig.targets).containsEntry(localityTwo.toString(),
assertThat(wtConfig.targets).containsEntry(localityTwo,
new WeightedPolicySelection(2, childPolicy));
}
@ -144,8 +143,7 @@ public class WrrLocalityLoadBalancerTest {
// The child config is delivered wrapped in the wrr_locality config and the locality weights
// in a ResolvedAddresses attribute.
WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy);
deliverAddresses(wlConfig, ImmutableList.of(
makeAddress("addr", Locality.create("test-region", "test-zone", "test-subzone"), null)));
deliverAddresses(wlConfig, ImmutableList.of(makeAddress("addr", "test-locality", null)));
// With no locality weights, we should get a TRANSIENT_FAILURE.
verify(mockHelper).getAuthority();
@ -166,8 +164,7 @@ public class WrrLocalityLoadBalancerTest {
@Test
public void handleNameResolutionError_withChildLb() {
deliverAddresses(new WrrLocalityConfig(new PolicySelection(mockChildProvider, null)),
ImmutableList.of(
makeAddress("addr1", Locality.create("test-region1", "test-zone", "test-subzone"), 1)));
ImmutableList.of(makeAddress("addr1", "test-locality", 1)));
Status status = Status.DEADLINE_EXCEEDED.withDescription("too slow");
loadBalancer.handleNameResolutionError(status);
@ -181,8 +178,7 @@ public class WrrLocalityLoadBalancerTest {
PolicySelection childPolicy = new PolicySelection(mockChildProvider, null);
WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy);
deliverAddresses(wlConfig, ImmutableList.of(
makeAddress("addr1", Locality.create("test-region1", "test-zone", "test-subzone"), 1)));
deliverAddresses(wlConfig, ImmutableList.of(makeAddress("addr1", "test-locality", 1)));
// Assert that the child policy and the locality weights were correctly mapped to a
// WeightedTargetConfig.
@ -195,8 +191,7 @@ public class WrrLocalityLoadBalancerTest {
@Test
public void shutdown() {
deliverAddresses(new WrrLocalityConfig(new PolicySelection(mockChildProvider, null)),
ImmutableList.of(
makeAddress("addr", Locality.create("test-region", "test-zone", "test-subzone"), 1)));
ImmutableList.of(makeAddress("addr", "test-locality", 1)));
loadBalancer.shutdown();
verify(mockWeightedTargetLb).shutdown();
@ -224,7 +219,7 @@ public class WrrLocalityLoadBalancerTest {
/**
* Create a locality-labeled address.
*/
private static EquivalentAddressGroup makeAddress(final String name, Locality locality,
private static EquivalentAddressGroup makeAddress(final String name, String locality,
Integer localityWeight) {
class FakeSocketAddress extends SocketAddress {
private final String name;
@ -257,13 +252,13 @@ public class WrrLocalityLoadBalancerTest {
}
Attributes.Builder attrBuilder = Attributes.newBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY, locality);
.set(InternalXdsAttributes.ATTR_LOCALITY_NAME, locality);
if (localityWeight != null) {
attrBuilder.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT, localityWeight);
}
EquivalentAddressGroup eag = new EquivalentAddressGroup(new FakeSocketAddress(name),
attrBuilder.build());
return AddressFilter.setPathFilter(eag, Collections.singletonList(locality.toString()));
return AddressFilter.setPathFilter(eag, Collections.singletonList(locality));
}
}