core: PF Index.size() should be number of addresses

This would impact TRANSIENT_FAILURE and refreshNameResolver() logic for
dual-stack endpoints.
This commit is contained in:
Eric Anderson 2024-08-15 16:59:27 -07:00
parent 6dbd1b9d5a
commit c120e364d2
2 changed files with 63 additions and 6 deletions

View File

@ -208,7 +208,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
} }
subchannels.clear(); subchannels.clear();
if (addressIndex != null) { if (addressIndex != null) {
addressIndex.updateGroups(null); addressIndex.updateGroups(ImmutableList.of());
} }
rawConnectivityState = TRANSIENT_FAILURE; rawConnectivityState = TRANSIENT_FAILURE;
updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error))); updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
@ -566,11 +566,12 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
@VisibleForTesting @VisibleForTesting
static final class Index { static final class Index {
private List<EquivalentAddressGroup> addressGroups; private List<EquivalentAddressGroup> addressGroups;
private int size;
private int groupIndex; private int groupIndex;
private int addressIndex; private int addressIndex;
public Index(List<EquivalentAddressGroup> groups) { public Index(List<EquivalentAddressGroup> groups) {
this.addressGroups = groups != null ? groups : Collections.emptyList(); updateGroups(groups);
} }
public boolean isValid() { public boolean isValid() {
@ -629,9 +630,14 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
/** /**
* Update to new groups, resetting the current index. * Update to new groups, resetting the current index.
*/ */
public void updateGroups(ImmutableList<EquivalentAddressGroup> newGroups) { public void updateGroups(List<EquivalentAddressGroup> newGroups) {
addressGroups = newGroups != null ? newGroups : Collections.emptyList(); addressGroups = checkNotNull(newGroups, "newGroups");
reset(); reset();
int size = 0;
for (EquivalentAddressGroup eag : newGroups) {
size += eag.getAddresses().size();
}
this.size = size;
} }
/** /**
@ -652,7 +658,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
} }
public int size() { public int size() {
return (addressGroups != null) ? addressGroups.size() : 0; return size;
} }
} }

View File

@ -662,7 +662,7 @@ public class PickFirstLeafLoadBalancerTest {
} }
@Test @Test
public void nameResolutionAfterSufficientTFs() { public void nameResolutionAfterSufficientTFs_multipleEags() {
InOrder inOrder = inOrder(mockHelper); InOrder inOrder = inOrder(mockHelper);
acceptXSubchannels(3); acceptXSubchannels(3);
Status error = Status.UNAVAILABLE.withDescription("boom!"); Status error = Status.UNAVAILABLE.withDescription("boom!");
@ -707,6 +707,57 @@ public class PickFirstLeafLoadBalancerTest {
inOrder.verify(mockHelper).refreshNameResolution(); inOrder.verify(mockHelper).refreshNameResolution();
} }
@Test
public void nameResolutionAfterSufficientTFs_singleEag() {
InOrder inOrder = inOrder(mockHelper);
EquivalentAddressGroup eag = new EquivalentAddressGroup(Arrays.asList(
new FakeSocketAddress("server1"),
new FakeSocketAddress("server2"),
new FakeSocketAddress("server3")));
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(Arrays.asList(eag)).build());
Status error = Status.UNAVAILABLE.withDescription("boom!");
// Initial subchannel gets TF, LB is still in CONNECTING
verify(mockSubchannel1).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener1 = stateListenerCaptor.getValue();
stateListener1.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
// Second subchannel gets TF, no UpdateBalancingState called
verify(mockSubchannel2).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener2 = stateListenerCaptor.getValue();
stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper, never()).refreshNameResolution();
inOrder.verify(mockHelper, never()).updateBalancingState(any(), any());
// Third subchannel gets TF, LB goes into TRANSIENT_FAILURE and does a refreshNameResolution
verify(mockSubchannel3).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener3 = stateListenerCaptor.getValue();
stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
inOrder.verify(mockHelper).refreshNameResolution();
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
// Only after we have TFs reported for # of subchannels do we call refreshNameResolution
stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper, never()).refreshNameResolution();
stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper, never()).refreshNameResolution();
stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).refreshNameResolution();
// Now that we have refreshed, the count should have been reset
// Only after we have TFs reported for # of subchannels do we call refreshNameResolution
stateListener1.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper, never()).refreshNameResolution();
stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper, never()).refreshNameResolution();
stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).refreshNameResolution();
}
@Test @Test
public void nameResolutionSuccessAfterError() { public void nameResolutionSuccessAfterError() {
loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError")); loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError"));