mirror of https://github.com/grpc/grpc-java.git
xds: ring_hash to use acceptResolvedAddresses() (#9617)
Part of a migration to move all LBs away from handleResolvedAddresses()
This commit is contained in:
parent
a97db60fd7
commit
a65ecef538
|
@ -81,13 +81,13 @@ final class RingHashLoadBalancer extends LoadBalancer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||
public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
|
||||
List<EquivalentAddressGroup> addrList = resolvedAddresses.getAddresses();
|
||||
if (addrList.isEmpty()) {
|
||||
handleNameResolutionError(Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS "
|
||||
+ "resolution was successful, but returned server addresses are empty."));
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
Map<EquivalentAddressGroup, EquivalentAddressGroup> latestAddrs = stripAttrs(addrList);
|
||||
Set<EquivalentAddressGroup> removedAddrs =
|
||||
|
@ -162,6 +162,8 @@ final class RingHashLoadBalancer extends LoadBalancer {
|
|||
for (Subchannel subchann : removedSubchannels) {
|
||||
shutdownSubchannel(subchann);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private static List<RingEntry> buildRing(
|
||||
|
|
|
@ -156,9 +156,10 @@ public class RingHashLoadBalancerTest {
|
|||
public void subchannelLazyConnectUntilPicked() {
|
||||
RingHashConfig config = new RingHashConfig(10, 100);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1); // one server
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
Subchannel subchannel = Iterables.getOnlyElement(subchannels.values());
|
||||
verify(subchannel, never()).requestConnection();
|
||||
|
@ -187,9 +188,10 @@ public class RingHashLoadBalancerTest {
|
|||
public void subchannelNotAutoReconnectAfterReenteringIdle() {
|
||||
RingHashConfig config = new RingHashConfig(10, 100);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1); // one server
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
Subchannel subchannel = Iterables.getOnlyElement(subchannels.values());
|
||||
InOrder inOrder = Mockito.inOrder(helper, subchannel);
|
||||
inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
|
||||
|
@ -217,9 +219,10 @@ public class RingHashLoadBalancerTest {
|
|||
RingHashConfig config = new RingHashConfig(10, 100);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1);
|
||||
InOrder inOrder = Mockito.inOrder(helper);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
inOrder.verify(helper, times(2)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
|
||||
|
@ -278,9 +281,10 @@ public class RingHashLoadBalancerTest {
|
|||
RingHashConfig config = new RingHashConfig(10, 100);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1, 1);
|
||||
InOrder inOrder = Mockito.inOrder(helper);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
inOrder.verify(helper, times(4)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
|
||||
|
@ -336,9 +340,10 @@ public class RingHashLoadBalancerTest {
|
|||
public void subchannelStayInTransientFailureUntilBecomeReady() {
|
||||
RingHashConfig config = new RingHashConfig(10, 100);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
reset(helper);
|
||||
|
@ -378,9 +383,10 @@ public class RingHashLoadBalancerTest {
|
|||
RingHashConfig config = new RingHashConfig(10, 100);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
|
||||
InOrder inOrder = Mockito.inOrder(helper);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
|
||||
|
@ -394,9 +400,10 @@ public class RingHashLoadBalancerTest {
|
|||
verifyConnection(1);
|
||||
|
||||
servers = createWeightedServerAddrs(1,1);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
|
||||
verifyConnection(1);
|
||||
|
@ -422,9 +429,10 @@ public class RingHashLoadBalancerTest {
|
|||
public void ignoreShutdownSubchannelStateChange() {
|
||||
RingHashConfig config = new RingHashConfig(10, 100);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
|
||||
|
@ -442,9 +450,10 @@ public class RingHashLoadBalancerTest {
|
|||
public void deterministicPickWithHostsPartiallyRemoved() {
|
||||
RingHashConfig config = new RingHashConfig(10, 100);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1, 1, 1);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
InOrder inOrder = Mockito.inOrder(helper);
|
||||
inOrder.verify(helper, times(5)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
|
@ -470,9 +479,10 @@ public class RingHashLoadBalancerTest {
|
|||
Attributes attr = addr.getAttributes().toBuilder().set(CUSTOM_KEY, "custom value").build();
|
||||
updatedServers.add(new EquivalentAddressGroup(addr.getAddresses(), attr));
|
||||
}
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(updatedServers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(subchannels.get(Collections.singletonList(servers.get(0))))
|
||||
.updateAddresses(Collections.singletonList(updatedServers.get(0)));
|
||||
verify(subchannels.get(Collections.singletonList(servers.get(1))))
|
||||
|
@ -487,9 +497,10 @@ public class RingHashLoadBalancerTest {
|
|||
public void deterministicPickWithNewHostsAdded() {
|
||||
RingHashConfig config = new RingHashConfig(10, 100);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1); // server0 and server1
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
InOrder inOrder = Mockito.inOrder(helper);
|
||||
inOrder.verify(helper, times(2)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
|
||||
|
@ -511,9 +522,10 @@ public class RingHashLoadBalancerTest {
|
|||
assertThat(subchannel.getAddresses()).isEqualTo(servers.get(1));
|
||||
|
||||
servers = createWeightedServerAddrs(1, 1, 1, 1, 1); // server2, server3, server4 added
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
inOrder.verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
|
||||
assertThat(pickerCaptor.getValue().pickSubchannel(args).getSubchannel())
|
||||
|
@ -526,9 +538,10 @@ public class RingHashLoadBalancerTest {
|
|||
// Map each server address to exactly one ring entry.
|
||||
RingHashConfig config = new RingHashConfig(3, 3);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); // initial IDLE
|
||||
reset(helper);
|
||||
|
@ -583,9 +596,10 @@ public class RingHashLoadBalancerTest {
|
|||
// Map each server address to exactly one ring entry.
|
||||
RingHashConfig config = new RingHashConfig(3, 3);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); // initial IDLE
|
||||
reset(helper);
|
||||
|
@ -649,9 +663,10 @@ public class RingHashLoadBalancerTest {
|
|||
// Map each server address to exactly one ring entry.
|
||||
RingHashConfig config = new RingHashConfig(3, 3);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
|
||||
|
@ -687,9 +702,10 @@ public class RingHashLoadBalancerTest {
|
|||
// Map each server address to exactly one ring entry.
|
||||
RingHashConfig config = new RingHashConfig(3, 3);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
|
||||
|
@ -718,9 +734,10 @@ public class RingHashLoadBalancerTest {
|
|||
// Map each server address to exactly one ring entry.
|
||||
RingHashConfig config = new RingHashConfig(3, 3);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
|
||||
|
@ -749,9 +766,10 @@ public class RingHashLoadBalancerTest {
|
|||
// Map each server address to exactly one ring entry.
|
||||
RingHashConfig config = new RingHashConfig(3, 3);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
// ring:
|
||||
|
@ -784,9 +802,10 @@ public class RingHashLoadBalancerTest {
|
|||
// Map each server address to exactly one ring entry.
|
||||
RingHashConfig config = new RingHashConfig(3, 3);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
// ring:
|
||||
|
@ -822,9 +841,10 @@ public class RingHashLoadBalancerTest {
|
|||
// Map each server address to exactly one ring entry.
|
||||
RingHashConfig config = new RingHashConfig(3, 3);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
// ring:
|
||||
|
@ -864,9 +884,10 @@ public class RingHashLoadBalancerTest {
|
|||
// Map each server address to exactly one ring entry.
|
||||
RingHashConfig config = new RingHashConfig(3, 3);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
// ring:
|
||||
|
@ -908,9 +929,10 @@ public class RingHashLoadBalancerTest {
|
|||
// Map each server address to exactly one ring entry.
|
||||
RingHashConfig config = new RingHashConfig(3, 3);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
|
||||
|
@ -943,9 +965,10 @@ public class RingHashLoadBalancerTest {
|
|||
public void hostSelectionProportionalToWeights() {
|
||||
RingHashConfig config = new RingHashConfig(10000, 100000); // large ring
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 10, 100); // 1:10:100
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
|
||||
|
@ -979,9 +1002,10 @@ public class RingHashLoadBalancerTest {
|
|||
public void hostSelectionProportionalToRepeatedAddressCount() {
|
||||
RingHashConfig config = new RingHashConfig(10000, 100000);
|
||||
List<EquivalentAddressGroup> servers = createRepeatedServerAddrs(1, 10, 100); // 1:10:100
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||
|
||||
|
@ -1027,9 +1051,10 @@ public class RingHashLoadBalancerTest {
|
|||
public void nameResolutionErrorWithActiveSubchannels() {
|
||||
RingHashConfig config = new RingHashConfig(10, 100);
|
||||
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
|
||||
assertThat(addressesAccepted).isTrue();
|
||||
verify(helper).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
|
||||
|
||||
|
|
Loading…
Reference in New Issue