grpclb: add serviceName config to grpclb policy config (#6563)

This commit is contained in:
Jihun Cho 2020-02-11 10:27:47 -08:00 committed by GitHub
parent 45bb403f8a
commit 774f2763c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 473 additions and 203 deletions

View File

@ -0,0 +1,83 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.grpclb;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import io.grpc.grpclb.GrpclbState.Mode;
import javax.annotation.Nullable;
final class GrpclbConfig {
private final Mode mode;
@Nullable
private final String serviceName;
private GrpclbConfig(Mode mode, @Nullable String serviceName) {
this.mode = checkNotNull(mode, "mode");
this.serviceName = serviceName;
}
static GrpclbConfig create(Mode mode) {
return create(mode, null);
}
static GrpclbConfig create(Mode mode, @Nullable String serviceName) {
return new GrpclbConfig(mode, serviceName);
}
Mode getMode() {
return mode;
}
/**
* If specified, it overrides the name of the sevice name to be sent to the balancer. if not, the
* target to be sent to the balancer will continue to be obtained from the target URI passed
* to the gRPC client channel.
*/
@Nullable
String getServiceName() {
return serviceName;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
GrpclbConfig that = (GrpclbConfig) o;
return mode == that.mode && Objects.equal(serviceName, that.serviceName);
}
@Override
public int hashCode() {
return Objects.hashCode(mode, serviceName);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("mode", mode)
.add("serviceName", serviceName)
.toString();
}
}

View File

@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
@ -30,15 +29,10 @@ import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.grpclb.GrpclbState.Mode;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.internal.TimeProvider;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
@ -48,8 +42,8 @@ import javax.annotation.Nullable;
* or round-robin balancer.
*/
class GrpclbLoadBalancer extends LoadBalancer {
private static final Mode DEFAULT_MODE = Mode.ROUND_ROBIN;
private static final Logger logger = Logger.getLogger(GrpclbLoadBalancer.class.getName());
private static final GrpclbConfig DEFAULT_CONFIG = GrpclbConfig.create(Mode.ROUND_ROBIN);
private final Helper helper;
private final TimeProvider time;
@ -57,7 +51,7 @@ class GrpclbLoadBalancer extends LoadBalancer {
private final SubchannelPool subchannelPool;
private final BackoffPolicy.Provider backoffPolicyProvider;
private Mode mode = Mode.ROUND_ROBIN;
private GrpclbConfig config = DEFAULT_CONFIG;
// All mutable states in this class are mutated ONLY from Channel Executor
@Nullable
@ -88,7 +82,6 @@ class GrpclbLoadBalancer extends LoadBalancer {
}
@Override
@SuppressWarnings("deprecation") // TODO(creamsoup) migrate to use parsed service config
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
Attributes attributes = resolvedAddresses.getAttributes();
List<EquivalentAddressGroup> newLbAddresses = attributes.get(GrpclbConstants.ATTR_LB_ADDRS);
@ -114,11 +107,13 @@ class GrpclbLoadBalancer extends LoadBalancer {
newLbAddressGroups = Collections.unmodifiableList(newLbAddressGroups);
List<EquivalentAddressGroup> newBackendServers =
Collections.unmodifiableList(resolvedAddresses.getAddresses());
Map<String, ?> rawLbConfigValue = attributes.get(ATTR_LOAD_BALANCING_CONFIG);
Mode newMode = retrieveModeFromLbConfig(rawLbConfigValue, helper.getChannelLogger());
if (!mode.equals(newMode)) {
mode = newMode;
helper.getChannelLogger().log(ChannelLogLevel.INFO, "Mode: " + newMode);
GrpclbConfig newConfig = (GrpclbConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
if (newConfig == null) {
newConfig = DEFAULT_CONFIG;
}
if (!config.equals(newConfig)) {
config = newConfig;
helper.getChannelLogger().log(ChannelLogLevel.INFO, "Config: " + newConfig);
recreateStates();
}
grpclbState.handleAddresses(newLbAddressGroups, newBackendServers);
@ -131,40 +126,6 @@ class GrpclbLoadBalancer extends LoadBalancer {
}
}
@VisibleForTesting
static Mode retrieveModeFromLbConfig(
@Nullable Map<String, ?> rawLbConfigValue, ChannelLogger channelLogger) {
try {
if (rawLbConfigValue == null) {
return DEFAULT_MODE;
}
List<?> rawChildPolicies = getList(rawLbConfigValue, "childPolicy");
if (rawChildPolicies == null) {
return DEFAULT_MODE;
}
List<LbConfig> childPolicies =
ServiceConfigUtil.unwrapLoadBalancingConfigList(checkObjectList(rawChildPolicies));
for (LbConfig childPolicy : childPolicies) {
String childPolicyName = childPolicy.getPolicyName();
switch (childPolicyName) {
case "round_robin":
return Mode.ROUND_ROBIN;
case "pick_first":
return Mode.PICK_FIRST;
default:
channelLogger.log(
ChannelLogLevel.DEBUG,
"grpclb ignoring unsupported child policy " + childPolicyName);
}
}
} catch (RuntimeException e) {
channelLogger.log(ChannelLogLevel.WARNING, "Bad grpclb config, using " + DEFAULT_MODE);
logger.log(
Level.WARNING, "Bad grpclb config: " + rawLbConfigValue + ", using " + DEFAULT_MODE, e);
}
return DEFAULT_MODE;
}
private void resetStates() {
if (grpclbState != null) {
grpclbState.shutdown();
@ -175,8 +136,8 @@ class GrpclbLoadBalancer extends LoadBalancer {
private void recreateStates() {
resetStates();
checkState(grpclbState == null, "Should've been cleared");
grpclbState = new GrpclbState(mode, helper, subchannelPool, time, stopwatch,
backoffPolicyProvider);
grpclbState =
new GrpclbState(config, helper, subchannelPool, time, stopwatch, backoffPolicyProvider);
}
@Override
@ -201,37 +162,4 @@ class GrpclbLoadBalancer extends LoadBalancer {
GrpclbState getGrpclbState() {
return grpclbState;
}
// TODO(carl-mastrangelo): delete getList and checkObjectList once apply is complete for SVCCFG.
/**
* Gets a list from an object for the given key. Copy of
* {@link io.grpc.internal.ServiceConfigUtil#getList}.
*/
@Nullable
private static List<?> getList(Map<String, ?> obj, String key) {
assert key != null;
if (!obj.containsKey(key)) {
return null;
}
Object value = obj.get(key);
if (!(value instanceof List)) {
throw new ClassCastException(
String.format("value '%s' for key '%s' in %s is not List", value, key, obj));
}
return (List<?>) value;
}
/**
* Copy of {@link io.grpc.internal.ServiceConfigUtil#checkObjectList}.
*/
@SuppressWarnings("unchecked")
private static List<Map<String, ?>> checkObjectList(List<?> rawList) {
for (int i = 0; i < rawList.size(); i++) {
if (!(rawList.get(i) instanceof Map)) {
throw new ClassCastException(
String.format("value %s for idx %d in %s is not object", rawList.get(i), i, rawList));
}
}
return (List<Map<String, ?>>) rawList;
}
}

View File

@ -24,12 +24,13 @@ import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.grpclb.GrpclbState.Mode;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.JsonUtil;
import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.internal.TimeProvider;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
/**
* The provider for the "grpclb" balancing policy. This class should not be directly referenced in
@ -38,6 +39,7 @@ import javax.annotation.Nullable;
*/
@Internal
public final class GrpclbLoadBalancerProvider extends LoadBalancerProvider {
private static final Mode DEFAULT_MODE = Mode.ROUND_ROBIN;
@Override
@ -78,57 +80,37 @@ public final class GrpclbLoadBalancerProvider extends LoadBalancerProvider {
ConfigOrError parseLoadBalancingConfigPolicyInternal(
Map<String, ?> rawLoadBalancingPolicyConfig) {
if (rawLoadBalancingPolicyConfig == null) {
return ConfigOrError.fromConfig(DEFAULT_MODE);
return ConfigOrError.fromConfig(GrpclbConfig.create(DEFAULT_MODE));
}
List<?> rawChildPolicies = getList(rawLoadBalancingPolicyConfig, "childPolicy");
if (rawChildPolicies == null) {
return ConfigOrError.fromConfig(DEFAULT_MODE);
String serviceName = JsonUtil.getString(rawLoadBalancingPolicyConfig, "serviceName");
List<?> rawChildPolicies = JsonUtil.getList(rawLoadBalancingPolicyConfig, "childPolicy");
List<LbConfig> childPolicies = null;
if (rawChildPolicies != null) {
childPolicies =
ServiceConfigUtil
.unwrapLoadBalancingConfigList(JsonUtil.checkObjectList(rawChildPolicies));
}
List<LbConfig> childPolicies =
ServiceConfigUtil.unwrapLoadBalancingConfigList(checkObjectList(rawChildPolicies));
if (childPolicies == null || childPolicies.isEmpty()) {
return ConfigOrError.fromConfig(GrpclbConfig.create(DEFAULT_MODE, serviceName));
}
List<String> policiesTried = new ArrayList<>();
for (LbConfig childPolicy : childPolicies) {
String childPolicyName = childPolicy.getPolicyName();
switch (childPolicyName) {
case "round_robin":
return ConfigOrError.fromConfig(Mode.ROUND_ROBIN);
return ConfigOrError.fromConfig(GrpclbConfig.create(Mode.ROUND_ROBIN, serviceName));
case "pick_first":
return ConfigOrError.fromConfig(Mode.PICK_FIRST);
return ConfigOrError.fromConfig(GrpclbConfig.create(Mode.PICK_FIRST, serviceName));
default:
// TODO(zhangkun83): maybe log?
policiesTried.add(childPolicyName);
}
}
return ConfigOrError.fromConfig(DEFAULT_MODE);
}
/**
* Gets a list from an object for the given key. Copy of
* {@link io.grpc.internal.ServiceConfigUtil#getList}.
*/
@Nullable
private static List<?> getList(Map<String, ?> obj, String key) {
assert key != null;
if (!obj.containsKey(key)) {
return null;
}
Object value = obj.get(key);
if (!(value instanceof List)) {
throw new ClassCastException(
String.format("value '%s' for key '%s' in %s is not List", value, key, obj));
}
return (List<?>) value;
}
/**
* Copy of {@link io.grpc.internal.ServiceConfigUtil#checkObjectList}.
*/
@SuppressWarnings("unchecked")
private static List<Map<String, ?>> checkObjectList(List<?> rawList) {
for (int i = 0; i < rawList.size(); i++) {
if (!(rawList.get(i) instanceof Map)) {
throw new ClassCastException(
String.format("value %s for idx %d in %s is not object", rawList.get(i), i, rawList));
}
}
return (List<Map<String, ?>>) rawList;
return ConfigOrError.fromError(
Status
.INVALID_ARGUMENT
.withDescription(
"None of " + policiesTried + " specified child policies are available."));
}
}

View File

@ -142,7 +142,7 @@ final class GrpclbState {
@Nullable
private LbStream lbStream;
private Map<List<EquivalentAddressGroup>, Subchannel> subchannels = Collections.emptyMap();
private final Mode mode;
private final GrpclbConfig config;
// Has the same size as the round-robin list from the balancer.
// A drop entry from the round-robin list becomes a DropEntry here.
@ -154,22 +154,27 @@ final class GrpclbState {
new RoundRobinPicker(Collections.<DropEntry>emptyList(), Arrays.asList(BUFFER_ENTRY));
GrpclbState(
Mode mode,
GrpclbConfig config,
Helper helper,
SubchannelPool subchannelPool,
TimeProvider time,
Stopwatch stopwatch,
BackoffPolicy.Provider backoffPolicyProvider) {
this.mode = checkNotNull(mode, "mode");
this.config = checkNotNull(config, "config");
this.helper = checkNotNull(helper, "helper");
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
this.subchannelPool =
mode == Mode.ROUND_ROBIN ? checkNotNull(subchannelPool, "subchannelPool") : null;
config.getMode() == Mode.ROUND_ROBIN
? checkNotNull(subchannelPool, "subchannelPool") : null;
this.time = checkNotNull(time, "time provider");
this.stopwatch = checkNotNull(stopwatch, "stopwatch");
this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority");
if (config.getServiceName() != null) {
this.serviceName = config.getServiceName();
} else {
this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority");
}
this.logger = checkNotNull(helper.getChannelLogger(), "logger");
}
@ -183,7 +188,7 @@ final class GrpclbState {
}
return;
}
if (mode == Mode.ROUND_ROBIN && newState.getState() == IDLE) {
if (config.getMode() == Mode.ROUND_ROBIN && newState.getState() == IDLE) {
subchannel.requestConnection();
}
subchannel.getAttributes().get(STATE_INFO).set(newState);
@ -328,7 +333,7 @@ final class GrpclbState {
void shutdown() {
shutdownLbComm();
switch (mode) {
switch (config.getMode()) {
case ROUND_ROBIN:
// We close the subchannels through subchannelPool instead of helper just for convenience of
// testing.
@ -344,7 +349,7 @@ final class GrpclbState {
}
break;
default:
throw new AssertionError("Missing case for " + mode);
throw new AssertionError("Missing case for " + config.getMode());
}
subchannels = Collections.emptyMap();
cancelFallbackTimer();
@ -385,7 +390,7 @@ final class GrpclbState {
new HashMap<>();
List<BackendEntry> newBackendList = new ArrayList<>();
switch (mode) {
switch (config.getMode()) {
case ROUND_ROBIN:
for (BackendAddressGroup backendAddr : newBackendAddrList) {
EquivalentAddressGroup eag = backendAddr.getAddresses();
@ -448,7 +453,7 @@ final class GrpclbState {
new BackendEntry(subchannel, new TokenAttachingTracerFactory(loadRecorder)));
break;
default:
throw new AssertionError("Missing case for " + mode);
throw new AssertionError("Missing case for " + config.getMode());
}
dropList = Collections.unmodifiableList(newDropList);
@ -693,7 +698,7 @@ final class GrpclbState {
private void maybeUpdatePicker() {
List<RoundRobinEntry> pickList;
ConnectivityState state;
switch (mode) {
switch (config.getMode()) {
case ROUND_ROBIN:
pickList = new ArrayList<>(backendList.size());
Status error = null;
@ -751,7 +756,7 @@ final class GrpclbState {
}
break;
default:
throw new AssertionError("Missing case for " + mode);
throw new AssertionError("Missing case for " + config.getMode());
}
maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList));
}

View File

@ -0,0 +1,162 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.grpclb;
import static com.google.common.truth.Truth.assertThat;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.grpclb.GrpclbState.Mode;
import io.grpc.internal.JsonParser;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class GrpclbLoadBalancerProviderTest {
private final GrpclbLoadBalancerProvider provider = new GrpclbLoadBalancerProvider();
@Test
public void retrieveModeFromLbConfig_pickFirst() throws Exception {
String lbConfig = "{\"childPolicy\" : [{\"pick_first\" : {}}, {\"round_robin\" : {}}]}";
ConfigOrError configOrError =
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));
assertThat(configOrError.getConfig()).isNotNull();
GrpclbConfig config = (GrpclbConfig) configOrError.getConfig();
assertThat(config.getMode()).isEqualTo(Mode.PICK_FIRST);
assertThat(config.getServiceName()).isNull();
}
@Test
public void retrieveModeFromLbConfig_roundRobin() throws Exception {
String lbConfig = "{\"childPolicy\" : [{\"round_robin\" : {}}, {\"pick_first\" : {}}]}";
ConfigOrError configOrError =
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));
assertThat(configOrError.getConfig()).isNotNull();
GrpclbConfig config = (GrpclbConfig) configOrError.getConfig();
assertThat(config.getMode()).isEqualTo(Mode.ROUND_ROBIN);
assertThat(config.getServiceName()).isNull();
}
@Test
public void retrieveModeFromLbConfig_nullConfigUseRoundRobin() throws Exception {
ConfigOrError configOrError =
provider.parseLoadBalancingPolicyConfig(null);
assertThat(configOrError.getConfig()).isNotNull();
GrpclbConfig config = (GrpclbConfig) configOrError.getConfig();
assertThat(config.getMode()).isEqualTo(Mode.ROUND_ROBIN);
assertThat(config.getServiceName()).isNull();
}
@Test
public void retrieveModeFromLbConfig_emptyConfigUseRoundRobin() throws Exception {
String lbConfig = "{}";
ConfigOrError configOrError =
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));
assertThat(configOrError.getConfig()).isNotNull();
GrpclbConfig config = (GrpclbConfig) configOrError.getConfig();
assertThat(config.getMode()).isEqualTo(Mode.ROUND_ROBIN);
assertThat(config.getServiceName()).isNull();
}
@Test
public void retrieveModeFromLbConfig_emptyChildPolicyUseRoundRobin() throws Exception {
String lbConfig = "{\"childPolicy\" : []}";
ConfigOrError configOrError =
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));
assertThat(configOrError.getConfig()).isNotNull();
GrpclbConfig config = (GrpclbConfig) configOrError.getConfig();
assertThat(config.getMode()).isEqualTo(Mode.ROUND_ROBIN);
assertThat(config.getServiceName()).isNull();
}
@Test
public void retrieveModeFromLbConfig_unsupportedChildPolicy()
throws Exception {
String lbConfig = "{\"childPolicy\" : [ {\"nonono\" : {}} ]}";
ConfigOrError configOrError =
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));
assertThat(configOrError.getError()).isNotNull();
assertThat(configOrError.getError().getDescription())
.contains("None of [nonono] specified child policies are available.");
}
@Test
public void retrieveModeFromLbConfig_skipUnsupportedChildPolicy() throws Exception {
String lbConfig = "{\"childPolicy\" : [ {\"nono\" : {}}, {\"pick_first\" : {} } ]}";
ConfigOrError configOrError =
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));
assertThat(configOrError.getConfig()).isNotNull();
GrpclbConfig config = (GrpclbConfig) configOrError.getConfig();
assertThat(config.getMode()).isEqualTo(Mode.PICK_FIRST);
assertThat(config.getServiceName()).isNull();
}
@Test
public void retrieveModeFromLbConfig_skipUnsupportedChildPolicyWithTarget() throws Exception {
String lbConfig = "{\"childPolicy\" : [ {\"nono\" : {}}, {\"pick_first\" : {}} ],"
+ "\"serviceName\": \"foo.google.com\"}";
ConfigOrError configOrError =
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));
assertThat(configOrError.getConfig()).isNotNull();
GrpclbConfig config = (GrpclbConfig) configOrError.getConfig();
assertThat(config.getMode()).isEqualTo(Mode.PICK_FIRST);
assertThat(config.getServiceName()).isEqualTo("foo.google.com");
}
@Test
public void retrieveModeFromLbConfig_wrongChildPolicyType() throws Exception {
String lbConfig = "{\"childPolicy\" : {}}";
ConfigOrError configOrError =
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));
assertThat(configOrError.getError()).isNotNull();
assertThat(configOrError.getError().getCause()).hasMessageThat().contains("is not List");
}
@Test
public void retrieveModeFromLbConfig_wrongChildPolicyTypeWithTarget() throws Exception {
String lbConfig = "{\"childPolicy\" : {}, \"serviceName\": \"foo.google.com\"}";
ConfigOrError configOrError =
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));
assertThat(configOrError.getError()).isNotNull();
assertThat(configOrError.getError().getCause()).hasMessageThat().contains("is not List");
}
@SuppressWarnings("unchecked")
private static Map<String, ?> parseJsonObject(String json) throws Exception {
return (Map<String, ?>) JsonParser.parse(json);
}
}

View File

@ -22,7 +22,6 @@ import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.grpclb.GrpclbLoadBalancer.retrieveModeFromLbConfig;
import static io.grpc.grpclb.GrpclbState.BUFFER_ENTRY;
import static io.grpc.grpclb.GrpclbState.DROP_PICK_RESULT;
import static org.junit.Assert.assertEquals;
@ -181,6 +180,8 @@ public class GrpclbLoadBalancerTest {
throw new AssertionError(e);
}
});
private final GrpclbLoadBalancerProvider grpclbLoadBalancerProvider =
new GrpclbLoadBalancerProvider();
private static final ClientStreamTracer.StreamInfo STREAM_INFO =
ClientStreamTracer.StreamInfo.newBuilder().build();
@ -2057,74 +2058,167 @@ public class GrpclbLoadBalancerTest {
}
@Test
public void retrieveModeFromLbConfig_pickFirst() throws Exception {
String lbConfig = "{\"childPolicy\" : [{\"pick_first\" : {}}, {\"round_robin\" : {}}]}";
@SuppressWarnings("deprecation")
public void switchMode_nullLbPolicy() throws Exception {
InOrder inOrder = inOrder(helper);
Mode mode = retrieveModeFromLbConfig(parseJsonObject(lbConfig), channelLogger);
assertThat(logs).isEmpty();
assertThat(mode).isEqualTo(Mode.PICK_FIRST);
final List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList,
Attributes.EMPTY,
/* grpclbConfig= */ null);
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// Simulate receiving LB response
List<ServerEntry> backends1 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends1));
// ROUND_ROBIN: create one subchannel per server
verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
assertEquals(2, mockSubchannels.size());
Subchannel subchannel1 = mockSubchannels.poll();
Subchannel subchannel2 = mockSubchannels.poll();
verify(subchannelPool, never())
.returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class));
// Switch to PICK_FIRST
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList,
Attributes.EMPTY,
GrpclbConfig.create(Mode.PICK_FIRST));
// GrpclbState will be shutdown, and a new one will be created
assertThat(oobChannel.isShutdown()).isTrue();
verify(subchannelPool)
.returnSubchannel(same(subchannel1), eq(ConnectivityStateInfo.forNonError(IDLE)));
verify(subchannelPool)
.returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE)));
// A new LB stream is created
assertEquals(1, fakeOobChannels.size());
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// Simulate receiving LB response
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends1));
// PICK_FIRST Subchannel
// TODO(zhangkun83): remove the deprecation suppression on this method once migrated to
// the new createSubchannel().
inOrder.verify(helper).createSubchannel(
eq(Arrays.asList(
new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")))),
any(Attributes.class));
inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
}
@SuppressWarnings("deprecation")
@Test
public void retrieveModeFromLbConfig_roundRobin() throws Exception {
String lbConfig = "{\"childPolicy\" : [{\"round_robin\" : {}}, {\"pick_first\" : {}}]}";
public void switchServiceName() throws Exception {
InOrder inOrder = inOrder(helper);
Mode mode = retrieveModeFromLbConfig(parseJsonObject(lbConfig), channelLogger);
assertThat(logs).isEmpty();
assertThat(mode).isEqualTo(Mode.ROUND_ROBIN);
}
String lbConfig = "{\"serviceName\": \"foo.google.com\"}";
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig))
.build();
@Test
public void retrieveModeFromLbConfig_nullConfigUseRoundRobin() throws Exception {
Mode mode = retrieveModeFromLbConfig(null, channelLogger);
assertThat(logs).isEmpty();
assertThat(mode).isEqualTo(Mode.ROUND_ROBIN);
}
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList, grpclbResolutionAttrs);
@Test
public void retrieveModeFromLbConfig_emptyConfigUseRoundRobin() throws Exception {
String lbConfig = "{}";
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName("foo.google.com").build())
.build()));
Mode mode = retrieveModeFromLbConfig(parseJsonObject(lbConfig), channelLogger);
assertThat(logs).isEmpty();
assertThat(mode).isEqualTo(Mode.ROUND_ROBIN);
}
// Simulate receiving LB response
List<ServerEntry> backends1 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends1));
@Test
public void retrieveModeFromLbConfig_emptyChildPolicyUseRoundRobin() throws Exception {
String lbConfig = "{\"childPolicy\" : []}";
// ROUND_ROBIN: create one subchannel per server
verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
assertEquals(2, mockSubchannels.size());
Subchannel subchannel1 = mockSubchannels.poll();
Subchannel subchannel2 = mockSubchannels.poll();
verify(subchannelPool, never())
.returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class));
Mode mode = retrieveModeFromLbConfig(parseJsonObject(lbConfig), channelLogger);
assertThat(logs).isEmpty();
assertThat(mode).isEqualTo(Mode.ROUND_ROBIN);
}
// Switch to different serviceName
lbConfig = "{\"serviceName\": \"bar.google.com\"}";
grpclbResolutionAttrs = Attributes.newBuilder().set(
LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build();
List<EquivalentAddressGroup> newGrpclbResolutionList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
newGrpclbResolutionList,
grpclbResolutionAttrs);
@Test
public void retrieveModeFromLbConfig_unsupportedChildPolicyUseRoundRobin()
throws Exception {
String lbConfig = "{\"childPolicy\" : [ {\"nonono\" : {}} ]}";
// GrpclbState will be shutdown, and a new one will be created
assertThat(oobChannel.isShutdown()).isTrue();
verify(subchannelPool)
.returnSubchannel(same(subchannel1), eq(ConnectivityStateInfo.forNonError(IDLE)));
verify(subchannelPool)
.returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE)));
Mode mode = retrieveModeFromLbConfig(parseJsonObject(lbConfig), channelLogger);
assertThat(logs).containsExactly("DEBUG: grpclb ignoring unsupported child policy nonono");
assertThat(mode).isEqualTo(Mode.ROUND_ROBIN);
}
@Test
public void retrieveModeFromLbConfig_skipUnsupportedChildPolicy() throws Exception {
String lbConfig = "{\"childPolicy\" : [ {\"nono\" : {}}, {\"pick_first\" : {} } ]}";
Mode mode = retrieveModeFromLbConfig(parseJsonObject(lbConfig), channelLogger);
assertThat(logs).containsExactly("DEBUG: grpclb ignoring unsupported child policy nono");
assertThat(mode).isEqualTo(Mode.PICK_FIRST);
}
@Test
public void retrieveModeFromLbConfig_badConfigDefaultToRoundRobin() throws Exception {
String lbConfig = "{\"childPolicy\" : {}}";
Mode mode = retrieveModeFromLbConfig(parseJsonObject(lbConfig), channelLogger);
assertThat(logs).containsExactly("WARNING: Bad grpclb config, using ROUND_ROBIN");
assertThat(mode).isEqualTo(Mode.ROUND_ROBIN);
assertEquals(1, fakeOobChannels.size());
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName("bar.google.com").build())
.build()));
}
@Test
@ -2352,21 +2446,37 @@ public class GrpclbLoadBalancerTest {
});
}
@SuppressWarnings("deprecation") // TODO(creamsoup) migrate test cases to use GrpclbConfig.
private void deliverResolvedAddresses(
List<EquivalentAddressGroup> backendAddrs,
List<EquivalentAddressGroup> balancerAddrs,
Attributes attrs) {
GrpclbConfig grpclbConfig;
Map<String, ?> lbJsonMap = attrs.get(LoadBalancer.ATTR_LOAD_BALANCING_CONFIG);
if (lbJsonMap != null) {
grpclbConfig = (GrpclbConfig) grpclbLoadBalancerProvider
.parseLoadBalancingPolicyConfig(lbJsonMap).getConfig();
} else {
grpclbConfig = GrpclbConfig.create(Mode.ROUND_ROBIN);
}
deliverResolvedAddresses(backendAddrs, balancerAddrs, attrs, grpclbConfig);
}
private void deliverResolvedAddresses(
final List<EquivalentAddressGroup> backendAddrs,
final List<EquivalentAddressGroup> balancerAddrs,
Attributes attrs) {
if (!balancerAddrs.isEmpty()) {
attrs = attrs.toBuilder().set(GrpclbConstants.ATTR_LB_ADDRS, balancerAddrs).build();
}
final Attributes finalAttrs = attrs;
List<EquivalentAddressGroup> balancerAddrs,
Attributes attributes,
final GrpclbConfig grpclbConfig) {
final Attributes attrs =
attributes.toBuilder().set(GrpclbConstants.ATTR_LB_ADDRS, balancerAddrs).build();
syncContext.execute(new Runnable() {
@Override
public void run() {
balancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(backendAddrs)
.setAttributes(finalAttrs)
.setAttributes(attrs)
.setLoadBalancingPolicyConfig(grpclbConfig)
.build());
}
});