xds: Disallow duplicate addresses in the RingHashLB. (#9776)

* xds: Disallow duplicate addresses in the RingHashLB.
Removed test that was previously checking for specific expected behavior with duplicate addresses.
This commit is contained in:
Larry Safran 2023-01-03 21:11:46 +00:00 committed by GitHub
parent 3c5c2be712
commit 51ee3eb6ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 40 deletions

View File

@ -26,6 +26,8 @@ import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.primitives.UnsignedInteger; import com.google.common.primitives.UnsignedInteger;
import io.grpc.Attributes; import io.grpc.Attributes;
@ -38,14 +40,17 @@ import io.grpc.Status;
import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext;
import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.net.SocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** /**
@ -173,6 +178,13 @@ final class RingHashLoadBalancer extends LoadBalancer {
return false; return false;
} }
String dupAddrString = validateNoDuplicateAddresses(addrList);
if (dupAddrString != null) {
handleNameResolutionError(Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS "
+ "resolution was successful, but there were duplicate addresses: " + dupAddrString));
return false;
}
long totalWeight = 0; long totalWeight = 0;
for (EquivalentAddressGroup eag : addrList) { for (EquivalentAddressGroup eag : addrList) {
Long weight = eag.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT); Long weight = eag.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT);
@ -207,6 +219,28 @@ final class RingHashLoadBalancer extends LoadBalancer {
return true; return true;
} }
@Nullable
private String validateNoDuplicateAddresses(List<EquivalentAddressGroup> addrList) {
Set<SocketAddress> addresses = new HashSet<>();
Multiset<String> dups = HashMultiset.create();
for (EquivalentAddressGroup eag : addrList) {
for (SocketAddress address : eag.getAddresses()) {
if (!addresses.add(address)) {
dups.add(address.toString());
}
}
}
if (!dups.isEmpty()) {
return dups.entrySet().stream()
.map((dup) ->
String.format("Address: %s, count: %d", dup.getElement(), dup.getCount() + 1))
.collect(Collectors.joining("; "));
}
return null;
}
private static List<RingEntry> buildRing( private static List<RingEntry> buildRing(
Map<EquivalentAddressGroup, Long> serverWeights, long totalWeight, double scale) { Map<EquivalentAddressGroup, Long> serverWeights, long totalWeight, double scale) {
List<RingEntry> ring = new ArrayList<>(); List<RingEntry> ring = new ArrayList<>();

View File

@ -552,9 +552,7 @@ public class RingHashLoadBalancerTest {
// "[FakeSocketAddress-server2]_0" // "[FakeSocketAddress-server2]_0"
long rpcHash = hashFunc.hashAsciiString("[FakeSocketAddress-server0]_0"); long rpcHash = hashFunc.hashAsciiString("[FakeSocketAddress-server0]_0");
PickSubchannelArgs args = new PickSubchannelArgsImpl( PickSubchannelArgs args = getDefaultPickSubchannelArgs(rpcHash);
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, rpcHash));
// Bring down server0 to force trying server2. // Bring down server0 to force trying server2.
deliverSubchannelState( deliverSubchannelState(
@ -592,6 +590,12 @@ public class RingHashLoadBalancerTest {
assertThat(result.getSubchannel().getAddresses()).isEqualTo(servers.get(2)); assertThat(result.getSubchannel().getAddresses()).isEqualTo(servers.get(2));
} }
private PickSubchannelArgsImpl getDefaultPickSubchannelArgs(long rpcHash) {
return new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, rpcHash));
}
@Test @Test
public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() { public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() {
// Map each server address to exactly one ring entry. // Map each server address to exactly one ring entry.
@ -1039,43 +1043,6 @@ public class RingHashLoadBalancerTest {
assertThat(ratio12).isWithin(0.03).of((double) 10 / 100); assertThat(ratio12).isWithin(0.03).of((double) 10 / 100);
} }
@Test
public void hostSelectionProportionalToRepeatedAddressCount() {
RingHashConfig config = new RingHashConfig(10000, 100000);
List<EquivalentAddressGroup> servers = createRepeatedServerAddrs(1, 10, 100); // 1:10:100
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
assertThat(addressesAccepted).isTrue();
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
// Bring all subchannels to READY.
Map<EquivalentAddressGroup, Integer> pickCounts = new HashMap<>();
for (Subchannel subchannel : subchannels.values()) {
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
pickCounts.put(subchannel.getAddresses(), 0);
}
verify(helper, times(3)).updateBalancingState(eq(READY), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
for (int i = 0; i < 10000; i++) {
long hash = hashFunc.hashInt(i);
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hash));
Subchannel pickedSubchannel = picker.pickSubchannel(args).getSubchannel();
EquivalentAddressGroup addr = pickedSubchannel.getAddresses();
pickCounts.put(addr, pickCounts.get(addr) + 1);
}
// Actual distribution: server0 = 104, server1 = 808, server2 = 9088
double ratio01 = (double) pickCounts.get(servers.get(0)) / pickCounts.get(servers.get(1));
double ratio12 = (double) pickCounts.get(servers.get(1)) / pickCounts.get(servers.get(11));
assertThat(ratio01).isWithin(0.03).of((double) 1 / 10);
assertThat(ratio12).isWithin(0.03).of((double) 10 / 100);
}
@Test @Test
public void nameResolutionErrorWithNoActiveSubchannels() { public void nameResolutionErrorWithNoActiveSubchannels() {
Status error = Status.UNAVAILABLE.withDescription("not reachable"); Status error = Status.UNAVAILABLE.withDescription("not reachable");
@ -1112,6 +1079,29 @@ public class RingHashLoadBalancerTest {
verifyNoMoreInteractions(helper); verifyNoMoreInteractions(helper);
} }
@Test
public void duplicateAddresses() {
RingHashConfig config = new RingHashConfig(10, 100);
List<EquivalentAddressGroup> servers = createRepeatedServerAddrs(1, 2, 3);
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
assertThat(addressesAccepted).isFalse();
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isFalse(); // fail the RPC
assertThat(result.getStatus().getCode())
.isEqualTo(Code.UNAVAILABLE); // with error status for the original server hit by hash
assertThat(result.getStatus().getDescription()).isEqualTo(
"Ring hash lb error: EDS resolution was successful, but there were duplicate "
+ "addresses: Address: FakeSocketAddress-server1, count: 2; "
+ "Address: FakeSocketAddress-server2, count: 3");
}
private void deliverSubchannelState(Subchannel subchannel, ConnectivityStateInfo state) { private void deliverSubchannelState(Subchannel subchannel, ConnectivityStateInfo state) {
subchannelStateListeners.get(subchannel).onSubchannelState(state); subchannelStateListeners.get(subchannel).onSubchannelState(state);
} }