xds: cluster_resolver LB policy should wait until all clusters being resolved before propagating endpoints to child LB policy (#8176)

Do not propagate partial endpoint discovery results to the child LB policy of cluster_resolver LB policy. This could avoid premature RPC failures when connections to resolved endpoints fail while there are other unresolved endpoints. Also, endpoints should be attempted in the order of clusters they belong to: endpoints from a lower-priority cluster should not be used before endpoints from a higher-priority cluster are attempted. Most importantly, it should not fallback to use DNS-resolved endpoints before all EDS-resolved endpoints failed.
This commit is contained in:
Chengyuan Zhang 2021-05-18 13:14:37 -07:00 committed by GitHub
parent e5d0e9d9a8
commit 86465b3399
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 106 additions and 78 deletions

View File

@ -17,11 +17,9 @@
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
@ -211,30 +209,35 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
List<EquivalentAddressGroup> addresses = new ArrayList<>();
Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
List<String> priorities = new ArrayList<>(); // totally ordered priority list
boolean allResolved = true;
Status endpointNotFound = Status.OK;
for (String cluster : clusters) {
ClusterState state = clusterStates.get(cluster);
if (!state.resolved) {
allResolved = false;
continue;
// Propagate endpoints to the child LB policy only after all clusters have been resolved.
if (!state.resolved && state.status.isOk()) {
return;
}
if (state.result != null) {
addresses.addAll(state.result.addresses);
priorityChildConfigs.putAll(state.result.priorityChildConfigs);
priorities.addAll(state.result.priorities);
} else {
endpointNotFound = state.status;
}
}
if (addresses.isEmpty()) {
if (endpointNotFound.isOk()) {
endpointNotFound = Status.UNAVAILABLE.withDescription(
"No usable endpoint from cluster(s): " + clusters);
} else {
endpointNotFound =
Status.UNAVAILABLE.withCause(endpointNotFound.getCause())
.withDescription(endpointNotFound.getDescription());
}
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(endpointNotFound));
if (childLb != null) {
childLb.shutdown();
childLb = null;
}
if (allResolved) {
Status unavailable = Status.UNAVAILABLE.withDescription("No usable endpoint");
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable));
} else {
helper.updateBalancingState(CONNECTING, BUFFER_PICKER);
}
return;
}
PriorityLbConfig childConfig =
@ -252,14 +255,15 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
private void handleEndpointResolutionError() {
boolean allInError = true;
Status error = null;
for (ClusterState state : clusterStates.values()) {
if (state.status.isOk()) {
allInError = false;
} else {
error = state.status;
}
}
if (allInError) {
// Propagate the error status of the last cluster. This is the best we can do.
Status error = clusterStates.get(clusters.get(clusters.size() - 1)).status;
if (childLb != null) {
childLb.handleNameResolutionError(error);
} else {
@ -581,10 +585,17 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
return;
}
status = error;
// NameResolver.Listener API cannot distinguish transient errors, we should avoid
// waiting for DNS addresses indefinitely.
resolved = true;
handleEndpointResolutionError();
// NameResolver.Listener API cannot distinguish between address-not-found and
// transient errors. If the error occurs in the first resolution, treat it as
// address not found. Otherwise, either there is previously resolved addresses
// previously encountered error, propagate the error to downstream/upstream and
// let downstream/upstream handle it.
if (!resolved) {
resolved = true;
handleEndpointResourceUpdate();
} else {
handleEndpointResolutionError();
}
if (scheduledRefresh != null && scheduledRefresh.isPending()) {
return;
}

View File

@ -290,42 +290,53 @@ public class ClusterResolverLoadBalancerTest {
String priority2 = CLUSTER2 + "[priority2]";
String priority3 = CLUSTER1 + "[priority1]";
// First deliver CLUSTER2's endpoints, two priorities with each has one locality.
// CLUSTER2: locality1 with priority 1 and locality3 with priority 2.
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME2,
ImmutableMap.of(locality1, localityLbEndpoints1, locality3, localityLbEndpoints3));
assertThat(childBalancers).isEmpty(); // not created until all clusters resolved
// CLUSTER1: locality2 with priority 1.
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, Collections.singletonMap(locality2, localityLbEndpoints2));
// Endpoints of all clusters have been resolved.
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME);
PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config;
assertThat(priorityLbConfig.priorities).containsExactly(priority1, priority2).inOrder();
PriorityChildConfig priorityChildConfig = priorityLbConfig.childConfigs.get(priority1);
assertThat(priorityChildConfig.ignoreReresolution).isTrue();
assertThat(priorityChildConfig.policySelection.getProvider().getPolicyName())
.isEqualTo(CLUSTER_IMPL_POLICY_NAME);
ClusterImplConfig clusterImplConfig =
(ClusterImplConfig) priorityChildConfig.policySelection.getConfig();
assertClusterImplConfig(clusterImplConfig, CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_NAME, 200L,
tlsContext, Collections.<DropOverload>emptyList(), WEIGHTED_TARGET_POLICY_NAME);
WeightedTargetConfig weightedTargetConfig =
(WeightedTargetConfig) clusterImplConfig.childPolicy.getConfig();
assertThat(weightedTargetConfig.targets.keySet()).containsExactly(locality1.toString());
WeightedPolicySelection target = weightedTargetConfig.targets.get(locality1.toString());
assertThat(target.weight).isEqualTo(70);
assertThat(target.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin");
assertThat(priorityLbConfig.priorities)
.containsExactly(priority3, priority1, priority2).inOrder();
priorityChildConfig = priorityLbConfig.childConfigs.get(priority2);
assertThat(priorityChildConfig.ignoreReresolution).isTrue();
assertThat(priorityChildConfig.policySelection.getProvider().getPolicyName())
PriorityChildConfig priorityChildConfig1 = priorityLbConfig.childConfigs.get(priority1);
assertThat(priorityChildConfig1.ignoreReresolution).isTrue();
assertThat(priorityChildConfig1.policySelection.getProvider().getPolicyName())
.isEqualTo(CLUSTER_IMPL_POLICY_NAME);
clusterImplConfig = (ClusterImplConfig) priorityChildConfig.policySelection.getConfig();
assertClusterImplConfig(clusterImplConfig, CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_NAME, 200L,
ClusterImplConfig clusterImplConfig1 =
(ClusterImplConfig) priorityChildConfig1.policySelection.getConfig();
assertClusterImplConfig(clusterImplConfig1, CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_NAME, 200L,
tlsContext, Collections.<DropOverload>emptyList(), WEIGHTED_TARGET_POLICY_NAME);
weightedTargetConfig = (WeightedTargetConfig) clusterImplConfig.childPolicy.getConfig();
assertThat(weightedTargetConfig.targets.keySet()).containsExactly(locality3.toString());
target = weightedTargetConfig.targets.get(locality3.toString());
assertThat(target.weight).isEqualTo(20);
assertThat(target.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin");
WeightedTargetConfig weightedTargetConfig1 =
(WeightedTargetConfig) clusterImplConfig1.childPolicy.getConfig();
assertThat(weightedTargetConfig1.targets.keySet()).containsExactly(locality1.toString());
WeightedPolicySelection target1 = weightedTargetConfig1.targets.get(locality1.toString());
assertThat(target1.weight).isEqualTo(70);
assertThat(target1.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin");
PriorityChildConfig priorityChildConfig2 = priorityLbConfig.childConfigs.get(priority2);
assertThat(priorityChildConfig2.ignoreReresolution).isTrue();
assertThat(priorityChildConfig2.policySelection.getProvider().getPolicyName())
.isEqualTo(CLUSTER_IMPL_POLICY_NAME);
ClusterImplConfig clusterImplConfig2 =
(ClusterImplConfig) priorityChildConfig2.policySelection.getConfig();
assertClusterImplConfig(clusterImplConfig2, CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_NAME, 200L,
tlsContext, Collections.<DropOverload>emptyList(), WEIGHTED_TARGET_POLICY_NAME);
WeightedTargetConfig weightedTargetConfig2 =
(WeightedTargetConfig) clusterImplConfig2.childPolicy.getConfig();
assertThat(weightedTargetConfig2.targets.keySet()).containsExactly(locality3.toString());
WeightedPolicySelection target2 = weightedTargetConfig2.targets.get(locality3.toString());
assertThat(target2.weight).isEqualTo(20);
assertThat(target2.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin");
List<EquivalentAddressGroup> priorityAddrs1 =
AddressFilter.filter(childBalancer.addresses, priority1);
assertThat(priorityAddrs1).hasSize(2);
@ -335,26 +346,20 @@ public class ClusterResolverLoadBalancerTest {
assertThat(priorityAddrs2).hasSize(1);
assertAddressesEqual(Collections.singletonList(endpoint4), priorityAddrs2);
// Then deliver CLUSTER1's endpoints, one priority with one locality.
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, Collections.singletonMap(locality2, localityLbEndpoints2));
priorityLbConfig = (PriorityLbConfig) childBalancer.config;
assertThat(priorityLbConfig.priorities)
.containsExactly(priority3, priority1, priority2).inOrder();
priorityChildConfig = priorityLbConfig.childConfigs.get(priority3);
assertThat(priorityChildConfig.ignoreReresolution).isTrue();
assertThat(priorityChildConfig.policySelection.getProvider().getPolicyName())
PriorityChildConfig priorityChildConfig3 = priorityLbConfig.childConfigs.get(priority3);
assertThat(priorityChildConfig3.ignoreReresolution).isTrue();
assertThat(priorityChildConfig3.policySelection.getProvider().getPolicyName())
.isEqualTo(CLUSTER_IMPL_POLICY_NAME);
clusterImplConfig = (ClusterImplConfig) priorityChildConfig.policySelection.getConfig();
assertClusterImplConfig(clusterImplConfig, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L,
ClusterImplConfig clusterImplConfig3 =
(ClusterImplConfig) priorityChildConfig3.policySelection.getConfig();
assertClusterImplConfig(clusterImplConfig3, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L,
tlsContext, Collections.<DropOverload>emptyList(), WEIGHTED_TARGET_POLICY_NAME);
weightedTargetConfig = (WeightedTargetConfig) clusterImplConfig.childPolicy.getConfig();
assertThat(weightedTargetConfig.targets.keySet()).containsExactly(locality2.toString());
target = weightedTargetConfig.targets.get(locality2.toString());
assertThat(target.weight).isEqualTo(10);
assertThat(target.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin");
WeightedTargetConfig weightedTargetConfig3 =
(WeightedTargetConfig) clusterImplConfig3.childPolicy.getConfig();
assertThat(weightedTargetConfig3.targets.keySet()).containsExactly(locality2.toString());
WeightedPolicySelection target3 = weightedTargetConfig3.targets.get(locality2.toString());
assertThat(target3.weight).isEqualTo(10);
assertThat(target3.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin");
List<EquivalentAddressGroup> priorityAddrs3 =
AddressFilter.filter(childBalancer.addresses, priority3);
assertThat(priorityAddrs3).hasSize(1);
@ -370,16 +375,17 @@ public class ClusterResolverLoadBalancerTest {
assertThat(childBalancers).isEmpty();
reset(helper);
xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1);
verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().isOk()).isTrue();
assertThat(result.getSubchannel()).isNull(); // buffer picker expected
verify(helper, never()).updateBalancingState(
any(ConnectivityState.class), any(SubchannelPicker.class)); // wait for CLUSTER2's results
xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME2);
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
Status expectedError = Status.UNAVAILABLE.withDescription("No usable endpoint");
assertPicker(pickerCaptor.getValue(), expectedError, null);
assertPicker(
pickerCaptor.getValue(),
Status.UNAVAILABLE.withDescription(
"No usable endpoint from cluster(s): " + Arrays.asList(CLUSTER1, CLUSTER2)),
null);
}
@Test
@ -413,7 +419,8 @@ public class ClusterResolverLoadBalancerTest {
xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1);
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
Status expectedError = Status.UNAVAILABLE.withDescription("No usable endpoint");
Status expectedError = Status.UNAVAILABLE.withDescription(
"No usable endpoint from cluster(s): " + Arrays.asList(CLUSTER1, CLUSTER2));
assertPicker(pickerCaptor.getValue(), expectedError, null);
}
@ -507,8 +514,11 @@ public class ClusterResolverLoadBalancerTest {
assertThat(childBalancers).isEmpty();
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
assertPicker(pickerCaptor.getValue(),
Status.UNAVAILABLE.withDescription("No usable endpoint"), null);
assertPicker(
pickerCaptor.getValue(),
Status.UNAVAILABLE.withDescription(
"No usable endpoint from cluster(s): " + Collections.singleton(CLUSTER1)),
null);
}
@Test
@ -691,10 +701,8 @@ public class ClusterResolverLoadBalancerTest {
assertThat(childBalancers).isEmpty();
reset(helper);
xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1);
verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().isOk()).isTrue();
assertThat(result.getSubchannel()).isNull(); // buffer picker expected, waiting for DNS
verify(helper, never()).updateBalancingState(
any(ConnectivityState.class), any(SubchannelPicker.class)); // wait for DNS results
FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
@ -737,7 +745,7 @@ public class ClusterResolverLoadBalancerTest {
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
assertPicker(pickerCaptor.getValue(),
Status.UNAVAILABLE.withDescription("No usable endpoint"), null);
Status.UNAVAILABLE.withDescription("I am lost"), null);
}
@Test
@ -756,10 +764,16 @@ public class ClusterResolverLoadBalancerTest {
10 /* localityWeight */, 1 /* priority */);
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); // child LB created
assertThat(childBalancers).isEmpty(); // not created until all clusters resolved.
FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
resolver.deliverError(Status.UNKNOWN.withDescription("I am lost"));
// DNS resolution failed, but there are EDS endpoints can be used.
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); // child LB created
assertThat(childBalancer.upstreamError).isNull(); // should not propagate error to child LB
assertAddressesEqual(Collections.singletonList(endpoint), childBalancer.addresses);
xdsClient.deliverError(Status.RESOURCE_EXHAUSTED.withDescription("out of memory"));
assertThat(childBalancer.upstreamError).isNotNull(); // last cluster's (DNS) error propagated
assertThat(childBalancer.upstreamError.getCode()).isEqualTo(Code.UNKNOWN);
@ -787,7 +801,10 @@ public class ClusterResolverLoadBalancerTest {
resolver.deliverError(dnsError);
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
assertPicker(pickerCaptor.getValue(), dnsError, null);
assertPicker(
pickerCaptor.getValue(),
Status.UNAVAILABLE.withDescription(dnsError.getDescription()),
null);
}
@Test