mirror of https://github.com/grpc/grpc-java.git
xds: add CdsLoadBalancer
- Contains `ClusterWatcher` implementation. On cluster update, `ClusterWatcherImpl` spawns an EDS child balancer if not created, then based on the ClusterUpdate data, it sends an edsConfig to the EDS child balancer. - `CdsLoadBalancer` reads `XdsClientRef` and `CdsConfig` from `resolvedAddresses`. Base on `resolvedAddresses`, it register a `ClusterWatcherImpl` to `XdsClient`. - For a different cluster resource name in CdsConfig when `handleResolvedAddresses()`, `CdsLoadBalancer` will gracefully switch to the new cluster.
This commit is contained in:
parent
b833aae615
commit
9c2501f146
|
@ -0,0 +1,289 @@
|
|||
/*
|
||||
* Copyright 2019 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
||||
import static io.grpc.xds.XdsLoadBalancerProvider.XDS_POLICY_NAME;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.ChannelLogger;
|
||||
import io.grpc.ChannelLogger.ChannelLogLevel;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.LoadBalancerRegistry;
|
||||
import io.grpc.NameResolver.ConfigOrError;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
||||
import io.grpc.util.GracefulSwitchLoadBalancer;
|
||||
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
|
||||
import io.grpc.xds.XdsClient.ClusterUpdate;
|
||||
import io.grpc.xds.XdsClient.ClusterWatcher;
|
||||
import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig;
|
||||
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
|
||||
import java.util.Map;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* Load balancer for experimental_cds LB policy.
|
||||
*/
|
||||
public final class CdsLoadBalancer extends LoadBalancer {
|
||||
private final ChannelLogger channelLogger;
|
||||
private final LoadBalancerRegistry lbRegistry;
|
||||
private final GracefulSwitchLoadBalancer switchingLoadBalancer;
|
||||
private final Helper helper;
|
||||
|
||||
// The following fields become non-null once handleResolvedAddresses() successfully.
|
||||
|
||||
// Most recent CdsConfig.
|
||||
@Nullable
|
||||
private CdsConfig cdsConfig;
|
||||
// Most recent ClusterWatcher.
|
||||
@Nullable
|
||||
private ClusterWatcher clusterWatcher;
|
||||
@Nullable
|
||||
private ObjectPool<XdsClient> xdsClientRef;
|
||||
@Nullable
|
||||
private XdsClient xdsClient;
|
||||
|
||||
CdsLoadBalancer(Helper helper) {
|
||||
this(helper, LoadBalancerRegistry.getDefaultRegistry());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
CdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry) {
|
||||
this.helper = helper;
|
||||
this.channelLogger = helper.getChannelLogger();
|
||||
this.lbRegistry = lbRegistry;
|
||||
this.switchingLoadBalancer = new GracefulSwitchLoadBalancer(helper);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||
channelLogger.log(ChannelLogLevel.DEBUG, "Received ResolvedAddresses '%s'", resolvedAddresses);
|
||||
Attributes attributes = resolvedAddresses.getAttributes();
|
||||
if (xdsClientRef == null) {
|
||||
xdsClientRef = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CLIENT_REF);
|
||||
if (xdsClientRef == null) {
|
||||
// TODO(zdapeng): create a new xdsClient from bootstrap if no one exists.
|
||||
helper.updateBalancingState(
|
||||
TRANSIENT_FAILURE,
|
||||
new ErrorPicker(Status.UNAVAILABLE.withDescription(
|
||||
"XDS_CLIENT_REF attributes not available from resolve addresses")));
|
||||
return;
|
||||
}
|
||||
xdsClient = xdsClientRef.getObject();
|
||||
}
|
||||
|
||||
Map<String, ?> newRawLbConfig = attributes.get(ATTR_LOAD_BALANCING_CONFIG);
|
||||
if (newRawLbConfig == null) {
|
||||
// This will not happen when the service config error handling is implemented.
|
||||
// For now simply go to TRANSIENT_FAILURE.
|
||||
helper.updateBalancingState(
|
||||
TRANSIENT_FAILURE,
|
||||
new ErrorPicker(
|
||||
Status.UNAVAILABLE.withDescription("ATTR_LOAD_BALANCING_CONFIG not available")));
|
||||
return;
|
||||
}
|
||||
ConfigOrError cfg =
|
||||
CdsLoadBalancerProvider.parseLoadBalancingConfigPolicy(newRawLbConfig);
|
||||
if (cfg.getError() != null) {
|
||||
// This will not happen when the service config error handling is implemented.
|
||||
// For now simply go to TRANSIENT_FAILURE.
|
||||
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(cfg.getError()));
|
||||
return;
|
||||
}
|
||||
final CdsConfig newCdsConfig = (CdsConfig) cfg.getConfig();
|
||||
|
||||
// If CdsConfig is changed, do a graceful switch.
|
||||
if (!newCdsConfig.equals(cdsConfig)) {
|
||||
LoadBalancerProvider fixedCdsConfigBalancerProvider =
|
||||
new FixedCdsConfigBalancerProvider(newCdsConfig);
|
||||
switchingLoadBalancer.switchTo(fixedCdsConfigBalancerProvider);
|
||||
}
|
||||
|
||||
switchingLoadBalancer.handleResolvedAddresses(resolvedAddresses);
|
||||
|
||||
// The clusterWatcher is also updated after switchingLoadBalancer.handleResolvedAddresses().
|
||||
cdsConfig = newCdsConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleNameResolutionError(Status error) {
|
||||
channelLogger.log(ChannelLogLevel.ERROR, "Name resolution error: '%s'", error);
|
||||
// Go into TRANSIENT_FAILURE if we have not yet received any cluster resource. Otherwise,
|
||||
// we keep running with the data we had previously.
|
||||
if (clusterWatcher == null) {
|
||||
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
|
||||
} else {
|
||||
switchingLoadBalancer.handleNameResolutionError(error);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canHandleEmptyAddressListFromNameResolution() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
channelLogger.log(ChannelLogLevel.DEBUG, "CDS load balancer is shutting down");
|
||||
|
||||
switchingLoadBalancer.shutdown();
|
||||
if (xdsClientRef != null) {
|
||||
xdsClientRef.returnObject(xdsClient);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A LoadBalancerProvider that provides a load balancer with a fixed CdsConfig.
|
||||
*/
|
||||
private final class FixedCdsConfigBalancerProvider extends LoadBalancerProvider {
|
||||
|
||||
final CdsConfig cdsConfig;
|
||||
final CdsConfig oldCdsConfig;
|
||||
final ClusterWatcher oldClusterWatcher;
|
||||
|
||||
FixedCdsConfigBalancerProvider(CdsConfig cdsConfig) {
|
||||
this.cdsConfig = cdsConfig;
|
||||
oldCdsConfig = CdsLoadBalancer.this.cdsConfig;
|
||||
oldClusterWatcher = CdsLoadBalancer.this.clusterWatcher;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAvailable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPriority() {
|
||||
return 5;
|
||||
}
|
||||
|
||||
// A synthetic policy name identified by CDS config.
|
||||
@Override
|
||||
public String getPolicyName() {
|
||||
return "cds_policy__cluster_name_" + cdsConfig.name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LoadBalancer newLoadBalancer(final Helper helper) {
|
||||
return new LoadBalancer() {
|
||||
// Becomes non-null once handleResolvedAddresses() successfully.
|
||||
// Assigned at most once.
|
||||
@Nullable
|
||||
ClusterWatcherImpl clusterWatcher;
|
||||
|
||||
@Override
|
||||
public void handleNameResolutionError(Status error) {
|
||||
if (clusterWatcher == null || clusterWatcher.edsBalancer == null) {
|
||||
// Go into TRANSIENT_FAILURE if we have not yet received any cluster resource.
|
||||
// Otherwise, we keep running with the data we had previously.
|
||||
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canHandleEmptyAddressListFromNameResolution() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
if (clusterWatcher != null) {
|
||||
if (clusterWatcher.edsBalancer != null) {
|
||||
clusterWatcher.edsBalancer.shutdown();
|
||||
}
|
||||
xdsClient.cancelClusterDataWatch(cdsConfig.name, clusterWatcher);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||
if (clusterWatcher == null) {
|
||||
clusterWatcher = new ClusterWatcherImpl(helper, resolvedAddresses);
|
||||
xdsClient.watchClusterData(cdsConfig.name, clusterWatcher);
|
||||
if (oldCdsConfig != null) {
|
||||
xdsClient.cancelClusterDataWatch(oldCdsConfig.name, oldClusterWatcher);
|
||||
}
|
||||
CdsLoadBalancer.this.clusterWatcher = clusterWatcher;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private final class ClusterWatcherImpl implements ClusterWatcher {
|
||||
|
||||
final Helper helper;
|
||||
final ResolvedAddresses resolvedAddresses;
|
||||
|
||||
// EDS balancer for the cluster.
|
||||
// Becomes non-null once handleResolvedAddresses() successfully.
|
||||
// Assigned at most once.
|
||||
@Nullable
|
||||
LoadBalancer edsBalancer;
|
||||
|
||||
ClusterWatcherImpl(Helper helper, ResolvedAddresses resolvedAddresses) {
|
||||
this.helper = helper;
|
||||
this.resolvedAddresses = resolvedAddresses;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterChanged(ClusterUpdate newUpdate) {
|
||||
channelLogger.log(
|
||||
ChannelLogLevel.DEBUG, "CDS load balancer received a cluster update: '%s'", newUpdate);
|
||||
checkArgument(
|
||||
newUpdate.getLbPolicy().equals("round_robin"),
|
||||
"The load balancing policy in ClusterUpdate '%s' is not supported", newUpdate);
|
||||
|
||||
final XdsConfig edsConfig = new XdsConfig(
|
||||
/* balancerName = */ null,
|
||||
new LbConfig(newUpdate.getLbPolicy(), ImmutableMap.<String, Object>of()),
|
||||
/* fallbackPolicy = */ null,
|
||||
/* edsServiceName = */ newUpdate.getEdsServiceName(),
|
||||
/* lrsServerName = */ newUpdate.getLrsServerName());
|
||||
if (edsBalancer == null) {
|
||||
edsBalancer = lbRegistry.getProvider(XDS_POLICY_NAME).newLoadBalancer(helper);
|
||||
}
|
||||
edsBalancer.handleResolvedAddresses(
|
||||
resolvedAddresses.toBuilder()
|
||||
.setAttributes(
|
||||
resolvedAddresses.getAttributes().toBuilder()
|
||||
.discard(ATTR_LOAD_BALANCING_CONFIG)
|
||||
.build())
|
||||
.setLoadBalancingPolicyConfig(edsConfig)
|
||||
.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Status error) {
|
||||
channelLogger.log(ChannelLogLevel.ERROR, "CDS load balancer received an error: '%s'", error);
|
||||
|
||||
// Go into TRANSIENT_FAILURE if we have not yet created the child
|
||||
// policy (i.e., we have not yet received valid data for the cluster). Otherwise,
|
||||
// we keep running with the data we had previously.
|
||||
if (edsBalancer == null) {
|
||||
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
* Copyright 2019 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
||||
import io.grpc.Internal;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancer.Helper;
|
||||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.NameResolver.ConfigOrError;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.JsonUtil;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* The provider for the "cds" balancing policy. This class should not be directly referenced in
|
||||
* code. The policy should be accessed through {@link io.grpc.LoadBalancerRegistry#getProvider}
|
||||
* with the name "cds" (currently "experimental_cds").
|
||||
*/
|
||||
@Internal
|
||||
public class CdsLoadBalancerProvider extends LoadBalancerProvider {
|
||||
|
||||
static final String CDS_POLICY_NAME = "experimental_cds";
|
||||
private static final String CLUSTER_KEY = "cluster";
|
||||
|
||||
@Override
|
||||
public boolean isAvailable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPriority() {
|
||||
return 5;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPolicyName() {
|
||||
return CDS_POLICY_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LoadBalancer newLoadBalancer(Helper helper) {
|
||||
return new CdsLoadBalancer(helper);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigOrError parseLoadBalancingPolicyConfig(
|
||||
Map<String, ?> rawLoadBalancingPolicyConfig) {
|
||||
return parseLoadBalancingConfigPolicy(rawLoadBalancingPolicyConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses raw load balancing config and returns a {@link ConfigOrError} that contains a
|
||||
* {@link CdsConfig} if parsing is successful.
|
||||
*/
|
||||
static ConfigOrError parseLoadBalancingConfigPolicy(Map<String, ?> rawLoadBalancingPolicyConfig) {
|
||||
try {
|
||||
String cluster =
|
||||
JsonUtil.getString(rawLoadBalancingPolicyConfig, CLUSTER_KEY);
|
||||
return ConfigOrError.fromConfig(new CdsConfig(cluster));
|
||||
} catch (RuntimeException e) {
|
||||
return ConfigOrError.fromError(
|
||||
Status.UNKNOWN.withDescription("Failed to parse config " + e.getMessage()).withCause(e));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a successfully parsed and validated LoadBalancingConfig for CDS.
|
||||
*/
|
||||
static final class CdsConfig {
|
||||
|
||||
/**
|
||||
* Name of cluster to query CDS for.
|
||||
*/
|
||||
final String name;
|
||||
|
||||
CdsConfig(String name) {
|
||||
checkArgument(name != null && !name.isEmpty(), "name is null or empty");
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
CdsConfig cdsConfig = (CdsConfig) o;
|
||||
return Objects.equals(name, cdsConfig.name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(name);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -125,9 +125,21 @@ final class LookasideLb extends LoadBalancer {
|
|||
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||
channelLogger.log(ChannelLogLevel.DEBUG, "Received ResolvedAddresses '%s'", resolvedAddresses);
|
||||
|
||||
// In the future, xdsConfig can be gotten directly by
|
||||
// resolvedAddresses.getLoadBalancingPolicyConfig().
|
||||
Attributes attributes = resolvedAddresses.getAttributes();
|
||||
XdsConfig newXdsConfig;
|
||||
Object lbConfig = resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||
if (lbConfig != null) {
|
||||
if (!(lbConfig instanceof XdsConfig)) {
|
||||
lookasideLbHelper.updateBalancingState(
|
||||
TRANSIENT_FAILURE,
|
||||
new ErrorPicker(Status.UNAVAILABLE.withDescription(
|
||||
"Load balancing config '" + lbConfig + "' is not an XdsConfig")));
|
||||
return;
|
||||
}
|
||||
newXdsConfig = (XdsConfig) lbConfig;
|
||||
} else {
|
||||
// In the future, in all cases xdsConfig can be gotten directly by
|
||||
// resolvedAddresses.getLoadBalancingPolicyConfig().
|
||||
Map<String, ?> newRawLbConfig = attributes.get(ATTR_LOAD_BALANCING_CONFIG);
|
||||
if (newRawLbConfig == null) {
|
||||
// This will not happen when the service config error handling is implemented.
|
||||
|
@ -146,7 +158,8 @@ final class LookasideLb extends LoadBalancer {
|
|||
lookasideLbHelper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(cfg.getError()));
|
||||
return;
|
||||
}
|
||||
XdsConfig newXdsConfig = (XdsConfig) cfg.getConfig();
|
||||
newXdsConfig = (XdsConfig) cfg.getConfig();
|
||||
}
|
||||
ObjectPool<XdsClient> xdsClientRefFromResolver = attributes.get(XdsAttributes.XDS_CLIENT_REF);
|
||||
ObjectPool<XdsClient> xdsClientRef;
|
||||
|
||||
|
|
|
@ -1 +1,2 @@
|
|||
io.grpc.xds.CdsLoadBalancerProvider
|
||||
io.grpc.xds.XdsLoadBalancerProvider
|
||||
|
|
|
@ -0,0 +1,428 @@
|
|||
/*
|
||||
* Copyright 2019 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static io.grpc.ConnectivityState.CONNECTING;
|
||||
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
||||
import static io.grpc.LoadBalancer.ATTR_LOAD_BALANCING_CONFIG;
|
||||
import static io.grpc.xds.XdsLoadBalancerProvider.XDS_POLICY_NAME;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.atLeastOnce;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.ChannelLogger;
|
||||
import io.grpc.ConnectivityState;
|
||||
import io.grpc.EquivalentAddressGroup;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancer.Helper;
|
||||
import io.grpc.LoadBalancer.ResolvedAddresses;
|
||||
import io.grpc.LoadBalancer.SubchannelPicker;
|
||||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.LoadBalancerRegistry;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.internal.FakeClock;
|
||||
import io.grpc.internal.JsonParser;
|
||||
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
||||
import io.grpc.xds.XdsClient.ClusterUpdate;
|
||||
import io.grpc.xds.XdsClient.ClusterWatcher;
|
||||
import io.grpc.xds.XdsClient.EndpointUpdate;
|
||||
import io.grpc.xds.XdsClient.EndpointWatcher;
|
||||
import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool;
|
||||
import io.grpc.xds.XdsClient.XdsClientFactory;
|
||||
import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
import java.util.Map;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
/**
|
||||
* Tests for {@link CdsLoadBalancer}.
|
||||
*/
|
||||
@RunWith(JUnit4.class)
|
||||
public class CdsLoadBalancerTest {
|
||||
|
||||
private final RefCountedXdsClientObjectPool xdsClientRef = new RefCountedXdsClientObjectPool(
|
||||
new XdsClientFactory() {
|
||||
@Override
|
||||
XdsClient createXdsClient() {
|
||||
xdsClient = mock(XdsClient.class);
|
||||
return xdsClient;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
private final LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry();
|
||||
private final LoadBalancerProvider fakeXdsLoadBlancerProvider = new LoadBalancerProvider() {
|
||||
@Override
|
||||
public boolean isAvailable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPriority() {
|
||||
return 5;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPolicyName() {
|
||||
return XDS_POLICY_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LoadBalancer newLoadBalancer(Helper helper) {
|
||||
edsLbHelpers.add(helper);
|
||||
LoadBalancer edsLoadBalancer = mock(LoadBalancer.class);
|
||||
edsLoadBalancers.add(edsLoadBalancer);
|
||||
return edsLoadBalancer;
|
||||
}
|
||||
};
|
||||
|
||||
private final SynchronizationContext syncContext = new SynchronizationContext(
|
||||
new Thread.UncaughtExceptionHandler() {
|
||||
@Override
|
||||
public void uncaughtException(Thread t, Throwable e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
|
||||
private final FakeClock fakeClock = new FakeClock();
|
||||
private final Deque<LoadBalancer> edsLoadBalancers = new ArrayDeque<>();
|
||||
private final Deque<Helper> edsLbHelpers = new ArrayDeque<>();
|
||||
|
||||
@Mock
|
||||
private Helper helper;
|
||||
@Mock
|
||||
private ChannelLogger channelLogger;
|
||||
|
||||
private LoadBalancer cdsLoadBalancer;
|
||||
private XdsClient xdsClient;
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
|
||||
doReturn(channelLogger).when(helper).getChannelLogger();
|
||||
doReturn(syncContext).when(helper).getSynchronizationContext();
|
||||
doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService();
|
||||
lbRegistry.register(fakeXdsLoadBlancerProvider);
|
||||
cdsLoadBalancer = new CdsLoadBalancer(helper, lbRegistry);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canHandleEmptyAddressListFromNameResolution() {
|
||||
assertThat(cdsLoadBalancer.canHandleEmptyAddressListFromNameResolution()).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void invalidConfigType() throws Exception {
|
||||
String lbConfigRaw = "{'cluster' : {}}".replace("'", "\"");
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
|
||||
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
|
||||
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
|
||||
.setAttributes(Attributes.newBuilder()
|
||||
.set(ATTR_LOAD_BALANCING_CONFIG, lbConfig)
|
||||
.set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef)
|
||||
.build())
|
||||
.build();
|
||||
|
||||
cdsLoadBalancer.handleResolvedAddresses(resolvedAddresses);
|
||||
|
||||
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void handleResolutionErrorBeforeOrAfterCdsWorking() throws Exception {
|
||||
String lbConfigRaw1 = "{'cluster' : 'foo.googleapis.com'}".replace("'", "\"");
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, ?> lbConfig1 = (Map<String, ?>) JsonParser.parse(lbConfigRaw1);
|
||||
ResolvedAddresses resolvedAddresses1 = ResolvedAddresses.newBuilder()
|
||||
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
|
||||
.setAttributes(Attributes.newBuilder()
|
||||
.set(ATTR_LOAD_BALANCING_CONFIG, lbConfig1)
|
||||
.set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef)
|
||||
.build())
|
||||
.build();
|
||||
cdsLoadBalancer.handleResolvedAddresses(resolvedAddresses1);
|
||||
ArgumentCaptor<ClusterWatcher> clusterWatcherCaptor1 = ArgumentCaptor.forClass(null);
|
||||
verify(xdsClient).watchClusterData(eq("foo.googleapis.com"), clusterWatcherCaptor1.capture());
|
||||
ClusterWatcher clusterWatcher1 = clusterWatcherCaptor1.getValue();
|
||||
|
||||
// handleResolutionError() before receiving any CDS response.
|
||||
cdsLoadBalancer.handleNameResolutionError(Status.DATA_LOSS.withDescription("fake status"));
|
||||
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
|
||||
|
||||
// CDS response received.
|
||||
clusterWatcher1.onClusterChanged(
|
||||
ClusterUpdate.newBuilder()
|
||||
.setClusterName("foo.googleapis.com")
|
||||
.setEdsServiceName("edsServiceFoo.googleapis.com")
|
||||
.setLbPolicy("round_robin")
|
||||
.setEnableLrs(false)
|
||||
.build());
|
||||
verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
|
||||
|
||||
// handleResolutionError() after receiving CDS response.
|
||||
cdsLoadBalancer.handleNameResolutionError(Status.DATA_LOSS.withDescription("fake status"));
|
||||
// No more TRANSIENT_FAILURE.
|
||||
verify(helper, times(1)).updateBalancingState(
|
||||
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void handleCdsConfigs() throws Exception {
|
||||
assertThat(xdsClient).isNull();
|
||||
|
||||
String lbConfigRaw1 = "{'cluster' : 'foo.googleapis.com'}".replace("'", "\"");
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, ?> lbConfig1 = (Map<String, ?>) JsonParser.parse(lbConfigRaw1);
|
||||
ResolvedAddresses resolvedAddresses1 = ResolvedAddresses.newBuilder()
|
||||
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
|
||||
.setAttributes(Attributes.newBuilder()
|
||||
.set(ATTR_LOAD_BALANCING_CONFIG, lbConfig1)
|
||||
.set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef)
|
||||
.build())
|
||||
.build();
|
||||
cdsLoadBalancer.handleResolvedAddresses(resolvedAddresses1);
|
||||
|
||||
ArgumentCaptor<ClusterWatcher> clusterWatcherCaptor1 = ArgumentCaptor.forClass(null);
|
||||
verify(xdsClient).watchClusterData(eq("foo.googleapis.com"), clusterWatcherCaptor1.capture());
|
||||
|
||||
ClusterWatcher clusterWatcher1 = clusterWatcherCaptor1.getValue();
|
||||
clusterWatcher1.onClusterChanged(
|
||||
ClusterUpdate.newBuilder()
|
||||
.setClusterName("foo.googleapis.com")
|
||||
.setEdsServiceName("edsServiceFoo.googleapis.com")
|
||||
.setLbPolicy("round_robin")
|
||||
.setEnableLrs(false)
|
||||
.build());
|
||||
|
||||
assertThat(edsLbHelpers).hasSize(1);
|
||||
assertThat(edsLoadBalancers).hasSize(1);
|
||||
Helper edsLbHelper1 = edsLbHelpers.poll();
|
||||
LoadBalancer edsLoadBalancer1 = edsLoadBalancers.poll();
|
||||
ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor1 = ArgumentCaptor.forClass(null);
|
||||
verify(edsLoadBalancer1).handleResolvedAddresses(resolvedAddressesCaptor1.capture());
|
||||
XdsConfig expectedXdsConfig = new XdsConfig(
|
||||
null,
|
||||
new LbConfig("round_robin", ImmutableMap.<String, Object>of()),
|
||||
null,
|
||||
"edsServiceFoo.googleapis.com",
|
||||
null);
|
||||
ResolvedAddresses resolvedAddressesFoo = resolvedAddressesCaptor1.getValue();
|
||||
assertThat(resolvedAddressesFoo.getLoadBalancingPolicyConfig()).isEqualTo(expectedXdsConfig);
|
||||
assertThat(resolvedAddressesFoo.getAttributes().get(XdsAttributes.XDS_CLIENT_REF))
|
||||
.isSameInstanceAs(xdsClientRef);
|
||||
|
||||
SubchannelPicker picker1 = mock(SubchannelPicker.class);
|
||||
edsLbHelper1.updateBalancingState(ConnectivityState.READY, picker1);
|
||||
verify(helper).updateBalancingState(ConnectivityState.READY, picker1);
|
||||
|
||||
String lbConfigRaw2 = "{'cluster' : 'bar.googleapis.com'}".replace("'", "\"");
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, ?> lbConfig2 = (Map<String, ?>) JsonParser.parse(lbConfigRaw2);
|
||||
ResolvedAddresses resolvedAddresses2 = ResolvedAddresses.newBuilder()
|
||||
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
|
||||
.setAttributes(Attributes.newBuilder()
|
||||
.set(ATTR_LOAD_BALANCING_CONFIG, lbConfig2)
|
||||
.set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef)
|
||||
.build())
|
||||
.build();
|
||||
cdsLoadBalancer.handleResolvedAddresses(resolvedAddresses2);
|
||||
|
||||
ArgumentCaptor<ClusterWatcher> clusterWatcherCaptor2 = ArgumentCaptor.forClass(null);
|
||||
verify(xdsClient).watchClusterData(eq("bar.googleapis.com"), clusterWatcherCaptor2.capture());
|
||||
verify(xdsClient).cancelClusterDataWatch("foo.googleapis.com", clusterWatcher1);
|
||||
|
||||
ClusterWatcher clusterWatcher2 = clusterWatcherCaptor2.getValue();
|
||||
clusterWatcher2.onClusterChanged(
|
||||
ClusterUpdate.newBuilder()
|
||||
.setClusterName("bar.googleapis.com")
|
||||
.setEdsServiceName("edsServiceBar.googleapis.com")
|
||||
.setLbPolicy("round_robin")
|
||||
.setEnableLrs(true)
|
||||
.setLrsServerName("lrsBar.googleapis.com")
|
||||
.build());
|
||||
|
||||
assertThat(edsLbHelpers).hasSize(1);
|
||||
assertThat(edsLoadBalancers).hasSize(1);
|
||||
Helper edsLbHelper2 = edsLbHelpers.poll();
|
||||
LoadBalancer edsLoadBalancer2 = edsLoadBalancers.poll();
|
||||
ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor2 = ArgumentCaptor.forClass(null);
|
||||
verify(edsLoadBalancer2).handleResolvedAddresses(resolvedAddressesCaptor2.capture());
|
||||
expectedXdsConfig = new XdsConfig(
|
||||
null,
|
||||
new LbConfig("round_robin", ImmutableMap.<String, Object>of()),
|
||||
null,
|
||||
"edsServiceBar.googleapis.com",
|
||||
"lrsBar.googleapis.com");
|
||||
ResolvedAddresses resolvedAddressesBar = resolvedAddressesCaptor2.getValue();
|
||||
assertThat(resolvedAddressesBar.getLoadBalancingPolicyConfig()).isEqualTo(expectedXdsConfig);
|
||||
assertThat(resolvedAddressesBar.getAttributes().get(XdsAttributes.XDS_CLIENT_REF))
|
||||
.isSameInstanceAs(xdsClientRef);
|
||||
|
||||
SubchannelPicker picker2 = mock(SubchannelPicker.class);
|
||||
edsLbHelper2.updateBalancingState(ConnectivityState.CONNECTING, picker2);
|
||||
verify(helper, never()).updateBalancingState(ConnectivityState.CONNECTING, picker2);
|
||||
verify(edsLoadBalancer1, never()).shutdown();
|
||||
|
||||
picker2 = mock(SubchannelPicker.class);
|
||||
edsLbHelper2.updateBalancingState(ConnectivityState.READY, picker2);
|
||||
verify(helper).updateBalancingState(ConnectivityState.READY, picker2);
|
||||
verify(edsLoadBalancer1).shutdown();
|
||||
|
||||
clusterWatcher2.onClusterChanged(
|
||||
ClusterUpdate.newBuilder()
|
||||
.setClusterName("bar.googleapis.com")
|
||||
.setEdsServiceName("edsServiceBar2.googleapis.com")
|
||||
.setLbPolicy("round_robin")
|
||||
.setEnableLrs(false)
|
||||
.build());
|
||||
verify(edsLoadBalancer2, times(2)).handleResolvedAddresses(resolvedAddressesCaptor2.capture());
|
||||
expectedXdsConfig = new XdsConfig(
|
||||
null,
|
||||
new LbConfig("round_robin", ImmutableMap.<String, Object>of()),
|
||||
null,
|
||||
"edsServiceBar2.googleapis.com",
|
||||
null);
|
||||
ResolvedAddresses resolvedAddressesBar2 = resolvedAddressesCaptor2.getValue();
|
||||
assertThat(resolvedAddressesBar2.getLoadBalancingPolicyConfig()).isEqualTo(expectedXdsConfig);
|
||||
|
||||
cdsLoadBalancer.shutdown();
|
||||
verify(edsLoadBalancer2).shutdown();
|
||||
verify(xdsClient).cancelClusterDataWatch("bar.googleapis.com", clusterWatcher2);
|
||||
assertThat(xdsClientRef.xdsClient).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void clusterWatcher_onErrorCalledBeforeAndAfterOnClusterChanged() throws Exception {
|
||||
String lbConfigRaw = "{'cluster' : 'foo.googleapis.com'}".replace("'", "\"");
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
|
||||
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
|
||||
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
|
||||
.setAttributes(Attributes.newBuilder()
|
||||
.set(ATTR_LOAD_BALANCING_CONFIG, lbConfig)
|
||||
.set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef)
|
||||
.build())
|
||||
.build();
|
||||
cdsLoadBalancer.handleResolvedAddresses(resolvedAddresses);
|
||||
|
||||
ArgumentCaptor<ClusterWatcher> clusterWatcherCaptor = ArgumentCaptor.forClass(null);
|
||||
verify(xdsClient).watchClusterData(eq("foo.googleapis.com"), clusterWatcherCaptor.capture());
|
||||
|
||||
ClusterWatcher clusterWatcher = clusterWatcherCaptor.getValue();
|
||||
|
||||
// Call onError() before onClusterChanged() ever called.
|
||||
clusterWatcher.onError(Status.DATA_LOSS.withDescription("fake status"));
|
||||
assertThat(edsLoadBalancers).isEmpty();
|
||||
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
|
||||
|
||||
clusterWatcher.onClusterChanged(
|
||||
ClusterUpdate.newBuilder()
|
||||
.setClusterName("foo.googleapis.com")
|
||||
.setEdsServiceName("edsServiceFoo.googleapis.com")
|
||||
.setLbPolicy("round_robin")
|
||||
.setEnableLrs(false)
|
||||
.build());
|
||||
|
||||
assertThat(edsLbHelpers).hasSize(1);
|
||||
assertThat(edsLoadBalancers).hasSize(1);
|
||||
Helper edsLbHelper = edsLbHelpers.poll();
|
||||
LoadBalancer edsLoadBalancer = edsLoadBalancers.poll();
|
||||
verify(edsLoadBalancer).handleResolvedAddresses(any(ResolvedAddresses.class));
|
||||
SubchannelPicker picker = mock(SubchannelPicker.class);
|
||||
|
||||
edsLbHelper.updateBalancingState(ConnectivityState.READY, picker);
|
||||
verify(helper).updateBalancingState(ConnectivityState.READY, picker);
|
||||
|
||||
// Call onError() after onClusterChanged().
|
||||
clusterWatcher.onError(Status.DATA_LOSS.withDescription("fake status"));
|
||||
// Verify no more TRANSIENT_FAILURE.
|
||||
verify(helper, times(1))
|
||||
.updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cdsBalancerIntegrateWithEdsBalancer() throws Exception {
|
||||
lbRegistry.deregister(fakeXdsLoadBlancerProvider);
|
||||
lbRegistry.register(new XdsLoadBalancerProvider());
|
||||
|
||||
String lbConfigRaw = "{'cluster' : 'foo.googleapis.com'}".replace("'", "\"");
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
|
||||
ResolvedAddresses resolvedAddresses1 = ResolvedAddresses.newBuilder()
|
||||
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
|
||||
.setAttributes(Attributes.newBuilder()
|
||||
.set(ATTR_LOAD_BALANCING_CONFIG, lbConfig)
|
||||
.set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef)
|
||||
.build())
|
||||
.build();
|
||||
cdsLoadBalancer.handleResolvedAddresses(resolvedAddresses1);
|
||||
ArgumentCaptor<ClusterWatcher> clusterWatcherCaptor = ArgumentCaptor.forClass(null);
|
||||
verify(xdsClient).watchClusterData(eq("foo.googleapis.com"), clusterWatcherCaptor.capture());
|
||||
ClusterWatcher clusterWatcher = clusterWatcherCaptor.getValue();
|
||||
clusterWatcher.onClusterChanged(
|
||||
ClusterUpdate.newBuilder()
|
||||
.setClusterName("foo.googleapis.com")
|
||||
.setEdsServiceName("edsServiceFoo.googleapis.com")
|
||||
.setLbPolicy("round_robin")
|
||||
.setEnableLrs(false)
|
||||
.build());
|
||||
|
||||
ArgumentCaptor<EndpointWatcher> endpointWatcherCaptor = ArgumentCaptor.forClass(null);
|
||||
verify(xdsClient).watchEndpointData(
|
||||
eq("edsServiceFoo.googleapis.com"), endpointWatcherCaptor.capture());
|
||||
EndpointWatcher endpointWatcher = endpointWatcherCaptor.getValue();
|
||||
|
||||
verify(helper, never()).updateBalancingState(
|
||||
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
|
||||
// Update endpoints with all backends unhealthy, the EDS will update channel state to
|
||||
// TRANSIENT_FAILURE.
|
||||
// Not able to test with healthy endpoints because the real EDS balancer is using real
|
||||
// round-robin balancer to balance endpoints.
|
||||
endpointWatcher.onEndpointChanged(EndpointUpdate.newBuilder()
|
||||
.setClusterName("edsServiceFoo.googleapis.com")
|
||||
.addLocalityLbEndpoints(
|
||||
new EnvoyProtoData.Locality("region", "zone", "subzone"),
|
||||
new EnvoyProtoData.LocalityLbEndpoints(
|
||||
// All unhealthy.
|
||||
ImmutableList.of(new EnvoyProtoData.LbEndpoint("127.0.0.1", 8080, 1, false)), 1, 0))
|
||||
.build());
|
||||
verify(helper, atLeastOnce()).updateBalancingState(
|
||||
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
|
||||
|
||||
cdsLoadBalancer.shutdown();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue