xds: implement LRS LB policy (#6858)

Part of xDS LB policy refactoring work. Implement the LRS LB policy for "balancing" endpoints within a certain locality.
This commit is contained in:
Chengyuan Zhang 2020-04-07 11:44:36 -07:00 committed by GitHub
parent 58a92b7530
commit d88f0f19ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 560 additions and 16 deletions

View File

@ -51,28 +51,26 @@ final class EnvoyProtoData {
static final class Locality {
private final String region;
private final String zone;
private final String subzone;
private final String subZone;
/** Must only be used for testing. */
@VisibleForTesting
Locality(String region, String zone, String subzone) {
Locality(String region, String zone, String subZone) {
this.region = region;
this.zone = zone;
this.subzone = subzone;
this.subZone = subZone;
}
static Locality fromEnvoyProtoLocality(io.envoyproxy.envoy.api.v2.core.Locality locality) {
return new Locality(
/* region = */ locality.getRegion(),
/* zone = */ locality.getZone(),
/* subzone = */ locality.getSubZone());
/* subZone = */ locality.getSubZone());
}
io.envoyproxy.envoy.api.v2.core.Locality toEnvoyProtoLocality() {
return io.envoyproxy.envoy.api.v2.core.Locality.newBuilder()
.setRegion(region)
.setZone(zone)
.setSubZone(subzone)
.setSubZone(subZone)
.build();
}
@ -84,8 +82,8 @@ final class EnvoyProtoData {
return zone;
}
String getSubzone() {
return subzone;
String getSubZone() {
return subZone;
}
@Override
@ -99,12 +97,12 @@ final class EnvoyProtoData {
Locality locality = (Locality) o;
return Objects.equals(region, locality.region)
&& Objects.equals(zone, locality.zone)
&& Objects.equals(subzone, locality.subzone);
&& Objects.equals(subZone, locality.subZone);
}
@Override
public int hashCode() {
return Objects.hash(region, zone, subzone);
return Objects.hash(region, zone, subZone);
}
@Override
@ -112,7 +110,7 @@ final class EnvoyProtoData {
return MoreObjects.toStringHelper(this)
.add("region", region)
.add("zone", zone)
.add("subzone", subzone)
.add("subZone", subZone)
.toString();
}
}

View File

@ -421,7 +421,7 @@ interface LocalityStore {
@Override
public String getAuthority() {
//FIXME: This should be a new proposed field of Locality, locality_name
return locality.getSubzone();
return locality.getSubZone();
}
};
orcaReportingHelperWrapper =

View File

@ -0,0 +1,123 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import io.grpc.ConnectivityState;
import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.ClientLoadCounter.LoadRecordingSubchannelPicker;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.LrsLoadBalancerProvider.LrsConfig;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.Objects;
import javax.annotation.CheckForNull;
/**
* Load balancer for lrs policy.
*/
final class LrsLoadBalancer extends LoadBalancer {
private final LoadBalancer.Helper helper;
@CheckForNull
private GracefulSwitchLoadBalancer switchingLoadBalancer;
private LoadStatsStore loadStatsStore;
private String clusterName;
private String edsServiceName;
private Locality locality;
private String childPolicyName;
LrsLoadBalancer(LoadBalancer.Helper helper) {
this.helper = checkNotNull(helper, "helper");
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
LrsConfig config = (LrsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
LoadStatsStore store =
resolvedAddresses.getAttributes().get(XdsAttributes.ATTR_CLUSTER_SERVICE_LOAD_STATS_STORE);
checkNotNull(config, "missing LRS lb config");
checkNotNull(store, "missing cluster service stats object");
checkAndSetUp(config, store);
if (switchingLoadBalancer == null) {
loadStatsStore.addLocality(config.locality);
final ClientLoadCounter counter = loadStatsStore.getLocalityCounter(config.locality);
LoadBalancer.Helper loadRecordingHelper = new ForwardingLoadBalancerHelper() {
@Override
protected Helper delegate() {
return helper;
}
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
SubchannelPicker loadRecordingPicker =
new LoadRecordingSubchannelPicker(counter, newPicker);
super.updateBalancingState(newState, loadRecordingPicker);
}
};
switchingLoadBalancer = new GracefulSwitchLoadBalancer(loadRecordingHelper);
}
String updatedChildPolicyName = config.childPolicy.getProvider().getPolicyName();
if (!Objects.equals(childPolicyName, updatedChildPolicyName)) {
switchingLoadBalancer.switchTo(config.childPolicy.getProvider());
childPolicyName = updatedChildPolicyName;
}
ResolvedAddresses downStreamResult =
resolvedAddresses.toBuilder()
.setLoadBalancingPolicyConfig(config.childPolicy.getConfig())
.build();
switchingLoadBalancer.handleResolvedAddresses(downStreamResult);
}
@Override
public void handleNameResolutionError(Status error) {
if (switchingLoadBalancer != null) {
switchingLoadBalancer.handleNameResolutionError(error);
} else {
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker(error));
}
}
@Override
public void shutdown() {
if (switchingLoadBalancer != null) {
loadStatsStore.removeLocality(locality);
switchingLoadBalancer.shutdown();
}
}
private void checkAndSetUp(LrsConfig config, LoadStatsStore store) {
checkState(
clusterName == null || clusterName.equals(config.clusterName),
"cluster name should not change");
checkState(
edsServiceName == null || edsServiceName.equals(config.edsServiceName),
"edsServiceName should not change");
checkState(locality == null || locality.equals(config.locality), "locality should not change");
checkState(
loadStatsStore == null || loadStatsStore.equals(store),
"loadStatsStore should not change");
clusterName = config.clusterName;
edsServiceName = config.edsServiceName;
locality = config.locality;
loadStatsStore = store;
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.EnvoyProtoData.Locality;
import java.util.Map;
import javax.annotation.Nullable;
/**
* Provider for lrs load balancing policy.
*/
@Internal
public final class LrsLoadBalancerProvider extends LoadBalancerProvider {
private static final String LRS_POLICY_NAME = "lrs_experimental";
@Override
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
return new LrsLoadBalancer(helper);
}
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 5;
}
@Override
public String getPolicyName() {
return LRS_POLICY_NAME;
}
@Override
public ConfigOrError parseLoadBalancingPolicyConfig(Map<String, ?> rawConfig) {
throw new UnsupportedOperationException();
}
static final class LrsConfig {
final String clusterName;
@Nullable
final String edsServiceName;
final String lrsServerName;
final Locality locality;
final PolicySelection childPolicy;
LrsConfig(
String clusterName,
@Nullable String edsServiceName,
String lrsServerName,
Locality locality,
PolicySelection childPolicy) {
this.clusterName = checkNotNull(clusterName, "clusterName");
this.edsServiceName = edsServiceName;
this.lrsServerName = checkNotNull(lrsServerName, "lrsServerName");
this.locality = checkNotNull(locality, "locality");
this.childPolicy = checkNotNull(childPolicy, "childPolicy");
}
}
}

View File

@ -79,5 +79,10 @@ public final class XdsAttributes {
static final Attributes.Key<ObjectPool<XdsClient>> XDS_CLIENT_POOL =
Attributes.Key.create("io.grpc.xds.XdsAttributes.xdsClientPool");
// TODO (chengyuanzhang): temporary solution for migrating to LRS policy. Should access
// stats object via XdsClient interface.
static final Attributes.Key<LoadStatsStore> ATTR_CLUSTER_SERVICE_LOAD_STATS_STORE =
Attributes.Key.create("io.grpc.xds.XdsAttributes.loadStatsStore");
private XdsAttributes() {}
}

View File

@ -41,7 +41,7 @@ public class EnvoyProtoDataTest {
Locality xdsLocality = Locality.fromEnvoyProtoLocality(locality);
assertThat(xdsLocality.getRegion()).isEqualTo("test_region");
assertThat(xdsLocality.getZone()).isEqualTo("test_zone");
assertThat(xdsLocality.getSubzone()).isEqualTo("test_subzone");
assertThat(xdsLocality.getSubZone()).isEqualTo("test_subzone");
io.envoyproxy.envoy.api.v2.core.Locality convertedLocality = xdsLocality.toEnvoyProtoLocality();
assertThat(convertedLocality.getRegion()).isEqualTo("test_region");

View File

@ -0,0 +1,334 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
import io.grpc.Attributes;
import io.grpc.ClientStreamTracer;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.Status;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.ClientLoadCounter.LoadRecordingStreamTracerFactory;
import io.grpc.xds.ClientLoadCounter.LoadRecordingSubchannelPicker;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.LrsLoadBalancerProvider.LrsConfig;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
/**
* Unit tests for {@link LrsLoadBalancer}.
*/
@RunWith(JUnit4.class)
public class LrsLoadBalancerTest {
@Rule
public final MockitoRule mockitoRule = MockitoJUnit.rule();
private static final String CLUSTER_NAME = "cluster-foo.googleapis.com";
private static final String EDS_SERVICE_NAME = "cluster-foo:service-blade";
private static final String LRS_SERVER_NAME = "trafficdirector.googleapis.com";
private static final Locality TEST_LOCALITY =
new Locality("test-region", "test-zone", "test-subzone");
private final ClientLoadCounter counter = new ClientLoadCounter();
private final LoadRecorder loadRecorder = new LoadRecorder();
private final Queue<LoadBalancer> childBalancers = new ArrayDeque<>();
@Mock
private Helper helper;
private LrsLoadBalancer loadBalancer;
@Before
public void setUp() {
loadBalancer = new LrsLoadBalancer(helper);
}
@After
public void tearDown() {
loadBalancer.shutdown();
}
@Test
public void subchannelPickerInterceptedWithLoadRecording() {
List<EquivalentAddressGroup> backendAddrs = createResolvedBackendAddresses(2);
deliverResolvedAddresses(backendAddrs, "round_robin");
FakeLoadBalancer childBalancer = (FakeLoadBalancer) childBalancers.poll();
NoopSubchannel subchannel = childBalancer.subchannels.values().iterator().next();
deliverSubchannelState(subchannel, ConnectivityState.READY);
assertThat(loadRecorder.recording).isTrue();
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
assertThat(picker).isInstanceOf(LoadRecordingSubchannelPicker.class);
PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class));
ClientStreamTracer.Factory tracerFactory = result.getStreamTracerFactory();
assertThat(((LoadRecordingStreamTracerFactory) tracerFactory).getCounter())
.isSameInstanceAs(counter);
loadBalancer.shutdown();
assertThat(childBalancer.shutdown).isTrue();
assertThat(loadRecorder.recording).isFalse();
}
@Test
public void updateChildPolicy() {
List<EquivalentAddressGroup> backendAddrs = createResolvedBackendAddresses(2);
deliverResolvedAddresses(backendAddrs, "round_robin");
FakeLoadBalancer childBalancer = (FakeLoadBalancer) childBalancers.poll();
assertThat(childBalancer.name).isEqualTo("round_robin");
deliverResolvedAddresses(backendAddrs, "pick_first");
assertThat(childBalancer.shutdown).isTrue();
childBalancer = (FakeLoadBalancer) childBalancers.poll();
assertThat(childBalancer.name).isEqualTo("pick_first");
loadBalancer.shutdown();
assertThat(childBalancer.shutdown).isTrue();
}
@Test
public void errorPropagation() {
loadBalancer.handleNameResolutionError(Status.UNKNOWN.withDescription("I failed"));
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
verify(helper)
.updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
Status status =
pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)).getStatus();
assertThat(status.getDescription()).contains("I failed");
List<EquivalentAddressGroup> backendAddrs = createResolvedBackendAddresses(2);
deliverResolvedAddresses(backendAddrs, "round_robin");
// Error after child policy is created.
loadBalancer.handleNameResolutionError(Status.UNKNOWN.withDescription("I failed"));
verify(helper, times(2))
.updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
status = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)).getStatus();
assertThat(status.getDescription()).contains("I failed");
assertThat(status.getDescription()).contains("handled by downstream balancer");
}
private void deliverResolvedAddresses(
List<EquivalentAddressGroup> addresses, String childPolicy) {
PolicySelection childPolicyConfig =
new PolicySelection(new FakeLoadBalancerProvider(childPolicy), null, null);
LrsConfig config =
new LrsConfig(
CLUSTER_NAME, EDS_SERVICE_NAME, LRS_SERVER_NAME, TEST_LOCALITY, childPolicyConfig);
ResolvedAddresses resolvedAddresses =
ResolvedAddresses.newBuilder()
.setAddresses(addresses)
.setAttributes(
Attributes.newBuilder()
.set(XdsAttributes.ATTR_CLUSTER_SERVICE_LOAD_STATS_STORE, loadRecorder)
.build())
.setLoadBalancingPolicyConfig(config)
.build();
loadBalancer.handleResolvedAddresses(resolvedAddresses);
}
private static List<EquivalentAddressGroup> createResolvedBackendAddresses(int n) {
List<EquivalentAddressGroup> list = new ArrayList<>();
for (int i = 0; i < n; i++) {
SocketAddress addr = new FakeSocketAddress("fake-address-" + i);
list.add(new EquivalentAddressGroup(addr));
}
return list;
}
private static void deliverSubchannelState(
final NoopSubchannel subchannel, ConnectivityState state) {
SubchannelPicker picker = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel);
}
};
subchannel.helper.updateBalancingState(state, picker);
}
private final class FakeLoadBalancerProvider extends LoadBalancerProvider {
private final String policyName;
FakeLoadBalancerProvider(String policyName) {
this.policyName = policyName;
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
LoadBalancer balancer = new FakeLoadBalancer(helper, policyName);
childBalancers.add(balancer);
return balancer;
}
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 0; // doesn't matter
}
@Override
public String getPolicyName() {
return policyName;
}
}
private static final class FakeLoadBalancer extends LoadBalancer {
private final Helper helper;
private final String name;
private boolean shutdown;
private final Map<EquivalentAddressGroup, NoopSubchannel> subchannels = new HashMap<>();
FakeLoadBalancer(Helper helper, String name) {
this.helper = helper;
this.name = name;
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> addresses = resolvedAddresses.getAddresses();
for (EquivalentAddressGroup eag : addresses) {
subchannels.put(eag, new NoopSubchannel(helper));
}
}
@Override
public void handleNameResolutionError(final Status error) {
SubchannelPicker picker = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error.augmentDescription("handled by downstream balancer"));
}
};
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, picker);
}
@Override
public void shutdown() {
shutdown = true;
}
}
private static final class NoopSubchannel extends Subchannel {
final Helper helper;
NoopSubchannel(Helper helper) {
this.helper = helper;
}
@Override
public void shutdown() {
}
@Override
public void requestConnection() {
}
@Override
public Attributes getAttributes() {
return Attributes.EMPTY;
}
}
private static final class FakeSocketAddress extends SocketAddress {
final String name;
FakeSocketAddress(String name) {
this.name = name;
}
@Override
public String toString() {
return "FakeSocketAddress-" + name;
}
@Override
public boolean equals(Object other) {
if (other instanceof FakeSocketAddress) {
FakeSocketAddress otherAddr = (FakeSocketAddress) other;
return name.equals(otherAddr.name);
}
return false;
}
@Override
public int hashCode() {
return name.hashCode();
}
}
private final class LoadRecorder implements LoadStatsStore {
private boolean recording = false;
@Override
public ClusterStats generateLoadReport() {
throw new UnsupportedOperationException("should not be called");
}
@Override
public void addLocality(Locality locality) {
assertThat(locality).isEqualTo(TEST_LOCALITY);
recording = true;
}
@Override
public void removeLocality(Locality locality) {
assertThat(locality).isEqualTo(TEST_LOCALITY);
recording = false;
}
@Override
public ClientLoadCounter getLocalityCounter(Locality locality) {
assertThat(locality).isEqualTo(TEST_LOCALITY);
return counter;
}
@Override
public void recordDroppedRequest(String category) {
throw new UnsupportedOperationException("should not be called");
}
}
}

View File

@ -176,7 +176,7 @@ class XdsClientTestHelper {
}
static io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints buildLocalityLbEndpoints(
String region, String zone, String subzone,
String region, String zone, String subZone,
List<io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint> lbEndpoints,
int loadBalancingWeight, int priority) {
return
@ -185,7 +185,7 @@ class XdsClientTestHelper {
io.envoyproxy.envoy.api.v2.core.Locality.newBuilder()
.setRegion(region)
.setZone(zone)
.setSubZone(subzone))
.setSubZone(subZone))
.addAllLbEndpoints(lbEndpoints)
.setLoadBalancingWeight(UInt32Value.newBuilder().setValue(loadBalancingWeight))
.setPriority(priority)