diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 4b36523000..55440fde8e 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -81,13 +81,13 @@ final class RingHashLoadBalancer extends LoadBalancer { } @Override - public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); List addrList = resolvedAddresses.getAddresses(); if (addrList.isEmpty()) { handleNameResolutionError(Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS " + "resolution was successful, but returned server addresses are empty.")); - return; + return false; } Map latestAddrs = stripAttrs(addrList); Set removedAddrs = @@ -162,6 +162,8 @@ final class RingHashLoadBalancer extends LoadBalancer { for (Subchannel subchann : removedSubchannels) { shutdownSubchannel(subchann); } + + return true; } private static List buildRing( diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index 9cfa00bc84..aae297d1ac 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -156,9 +156,10 @@ public class RingHashLoadBalancerTest { public void subchannelLazyConnectUntilPicked() { RingHashConfig config = new RingHashConfig(10, 100); List servers = createWeightedServerAddrs(1); // one server - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); Subchannel subchannel = Iterables.getOnlyElement(subchannels.values()); verify(subchannel, never()).requestConnection(); @@ -187,9 +188,10 @@ public class RingHashLoadBalancerTest { public void subchannelNotAutoReconnectAfterReenteringIdle() { RingHashConfig config = new RingHashConfig(10, 100); List servers = createWeightedServerAddrs(1); // one server - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); Subchannel subchannel = Iterables.getOnlyElement(subchannels.values()); InOrder inOrder = Mockito.inOrder(helper, subchannel); inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); @@ -217,9 +219,10 @@ public class RingHashLoadBalancerTest { RingHashConfig config = new RingHashConfig(10, 100); List servers = createWeightedServerAddrs(1, 1); InOrder inOrder = Mockito.inOrder(helper); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); inOrder.verify(helper, times(2)).createSubchannel(any(CreateSubchannelArgs.class)); inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); @@ -278,9 +281,10 @@ public class RingHashLoadBalancerTest { RingHashConfig config = new RingHashConfig(10, 100); List servers = createWeightedServerAddrs(1, 1, 1, 1); InOrder inOrder = Mockito.inOrder(helper); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); inOrder.verify(helper, times(4)).createSubchannel(any(CreateSubchannelArgs.class)); inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); @@ -336,9 +340,10 @@ public class RingHashLoadBalancerTest { public void subchannelStayInTransientFailureUntilBecomeReady() { RingHashConfig config = new RingHashConfig(10, 100); List servers = createWeightedServerAddrs(1, 1, 1); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); reset(helper); @@ -378,9 +383,10 @@ public class RingHashLoadBalancerTest { RingHashConfig config = new RingHashConfig(10, 100); List servers = createWeightedServerAddrs(1, 1, 1); InOrder inOrder = Mockito.inOrder(helper); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); @@ -394,9 +400,10 @@ public class RingHashLoadBalancerTest { verifyConnection(1); servers = createWeightedServerAddrs(1,1); - loadBalancer.handleResolvedAddresses( + addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); inOrder.verify(helper) .updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); verifyConnection(1); @@ -422,9 +429,10 @@ public class RingHashLoadBalancerTest { public void ignoreShutdownSubchannelStateChange() { RingHashConfig config = new RingHashConfig(10, 100); List servers = createWeightedServerAddrs(1, 1, 1); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); @@ -442,9 +450,10 @@ public class RingHashLoadBalancerTest { public void deterministicPickWithHostsPartiallyRemoved() { RingHashConfig config = new RingHashConfig(10, 100); List servers = createWeightedServerAddrs(1, 1, 1, 1, 1); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); InOrder inOrder = Mockito.inOrder(helper); inOrder.verify(helper, times(5)).createSubchannel(any(CreateSubchannelArgs.class)); inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); @@ -470,9 +479,10 @@ public class RingHashLoadBalancerTest { Attributes attr = addr.getAttributes().toBuilder().set(CUSTOM_KEY, "custom value").build(); updatedServers.add(new EquivalentAddressGroup(addr.getAddresses(), attr)); } - loadBalancer.handleResolvedAddresses( + addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(updatedServers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(subchannels.get(Collections.singletonList(servers.get(0)))) .updateAddresses(Collections.singletonList(updatedServers.get(0))); verify(subchannels.get(Collections.singletonList(servers.get(1)))) @@ -487,9 +497,10 @@ public class RingHashLoadBalancerTest { public void deterministicPickWithNewHostsAdded() { RingHashConfig config = new RingHashConfig(10, 100); List servers = createWeightedServerAddrs(1, 1); // server0 and server1 - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); InOrder inOrder = Mockito.inOrder(helper); inOrder.verify(helper, times(2)).createSubchannel(any(CreateSubchannelArgs.class)); inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); @@ -511,9 +522,10 @@ public class RingHashLoadBalancerTest { assertThat(subchannel.getAddresses()).isEqualTo(servers.get(1)); servers = createWeightedServerAddrs(1, 1, 1, 1, 1); // server2, server3, server4 added - loadBalancer.handleResolvedAddresses( + addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); inOrder.verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); assertThat(pickerCaptor.getValue().pickSubchannel(args).getSubchannel()) @@ -526,9 +538,10 @@ public class RingHashLoadBalancerTest { // Map each server address to exactly one ring entry. RingHashConfig config = new RingHashConfig(3, 3); List servers = createWeightedServerAddrs(1, 1, 1); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); // initial IDLE reset(helper); @@ -583,9 +596,10 @@ public class RingHashLoadBalancerTest { // Map each server address to exactly one ring entry. RingHashConfig config = new RingHashConfig(3, 3); List servers = createWeightedServerAddrs(1, 1, 1); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); // initial IDLE reset(helper); @@ -649,9 +663,10 @@ public class RingHashLoadBalancerTest { // Map each server address to exactly one ring entry. RingHashConfig config = new RingHashConfig(3, 3); List servers = createWeightedServerAddrs(1, 1, 1); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); @@ -687,9 +702,10 @@ public class RingHashLoadBalancerTest { // Map each server address to exactly one ring entry. RingHashConfig config = new RingHashConfig(3, 3); List servers = createWeightedServerAddrs(1, 1, 1); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); @@ -718,9 +734,10 @@ public class RingHashLoadBalancerTest { // Map each server address to exactly one ring entry. RingHashConfig config = new RingHashConfig(3, 3); List servers = createWeightedServerAddrs(1, 1, 1); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); @@ -749,9 +766,10 @@ public class RingHashLoadBalancerTest { // Map each server address to exactly one ring entry. RingHashConfig config = new RingHashConfig(3, 3); List servers = createWeightedServerAddrs(1, 1, 1); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); // ring: @@ -784,9 +802,10 @@ public class RingHashLoadBalancerTest { // Map each server address to exactly one ring entry. RingHashConfig config = new RingHashConfig(3, 3); List servers = createWeightedServerAddrs(1, 1, 1); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); // ring: @@ -822,9 +841,10 @@ public class RingHashLoadBalancerTest { // Map each server address to exactly one ring entry. RingHashConfig config = new RingHashConfig(3, 3); List servers = createWeightedServerAddrs(1, 1, 1); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); // ring: @@ -864,9 +884,10 @@ public class RingHashLoadBalancerTest { // Map each server address to exactly one ring entry. RingHashConfig config = new RingHashConfig(3, 3); List servers = createWeightedServerAddrs(1, 1, 1); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); // ring: @@ -908,9 +929,10 @@ public class RingHashLoadBalancerTest { // Map each server address to exactly one ring entry. RingHashConfig config = new RingHashConfig(3, 3); List servers = createWeightedServerAddrs(1, 1, 1); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); @@ -943,9 +965,10 @@ public class RingHashLoadBalancerTest { public void hostSelectionProportionalToWeights() { RingHashConfig config = new RingHashConfig(10000, 100000); // large ring List servers = createWeightedServerAddrs(1, 10, 100); // 1:10:100 - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); @@ -979,9 +1002,10 @@ public class RingHashLoadBalancerTest { public void hostSelectionProportionalToRepeatedAddressCount() { RingHashConfig config = new RingHashConfig(10000, 100000); List servers = createRepeatedServerAddrs(1, 10, 100); // 1:10:100 - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); @@ -1027,9 +1051,10 @@ public class RingHashLoadBalancerTest { public void nameResolutionErrorWithActiveSubchannels() { RingHashConfig config = new RingHashConfig(10, 100); List servers = createWeightedServerAddrs(1); - loadBalancer.handleResolvedAddresses( + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isTrue(); verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());