Dualstack interop testing enablement (#11231)

* Have java test server listen on appropriate address based upon new optional flag "address_type"
This commit is contained in:
Larry Safran 2024-06-27 16:37:48 -07:00 committed by GitHub
parent 3777c303f5
commit 7a53fa8bc1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 270 additions and 15 deletions

View File

@ -33,15 +33,15 @@ import java.util.Map;
* down the address list and sticks to the first that works.
*/
public final class PickFirstLoadBalancerProvider extends LoadBalancerProvider {
public static final String GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS =
"GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS";
public static final String GRPC_PF_USE_HAPPY_EYEBALLS = "GRPC_PF_USE_HAPPY_EYEBALLS";
private static final String SHUFFLE_ADDRESS_LIST_KEY = "shuffleAddressList";
static boolean enableNewPickFirst =
GrpcUtil.getFlag("GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST", false);
public static boolean isEnabledHappyEyeballs() {
return GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, false);
return GrpcUtil.getFlag(GRPC_PF_USE_HAPPY_EYEBALLS, false);
}
@VisibleForTesting

View File

@ -142,8 +142,8 @@ public class PickFirstLeafLoadBalancerTest {
@Before
public void setUp() {
originalHappyEyeballsEnabledValue =
System.getProperty(PickFirstLoadBalancerProvider.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS);
System.setProperty(PickFirstLoadBalancerProvider.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS,
System.getProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS);
System.setProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS,
enableHappyEyeballs ? "true" : "false");
for (int i = 1; i <= 5; i++) {
@ -173,9 +173,9 @@ public class PickFirstLeafLoadBalancerTest {
@After
public void tearDown() {
if (originalHappyEyeballsEnabledValue == null) {
System.clearProperty(PickFirstLoadBalancerProvider.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS);
System.clearProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS);
} else {
System.setProperty(PickFirstLoadBalancerProvider.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS,
System.setProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS,
originalHappyEyeballsEnabledValue);
}

View File

@ -22,14 +22,20 @@ import io.grpc.BindableService;
import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCredentials;
import io.grpc.ServerInterceptors;
import io.grpc.TlsServerCredentials;
import io.grpc.alts.AltsServerCredentials;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.services.MetricRecorder;
import io.grpc.testing.TlsTesting;
import io.grpc.xds.orca.OrcaMetricReportingServerInterceptor;
import io.grpc.xds.orca.OrcaServiceImpl;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -73,6 +79,7 @@ public class TestServiceServer {
private ScheduledExecutorService executor;
private Server server;
private int localHandshakerPort = -1;
private Util.AddressType addressType = Util.AddressType.IPV4_IPV6;
@VisibleForTesting
void parseArgs(String[] args) {
@ -103,6 +110,8 @@ public class TestServiceServer {
useAlts = Boolean.parseBoolean(value);
} else if ("local_handshaker_port".equals(key)) {
localHandshakerPort = Integer.parseInt(value);
} else if ("address_type".equals(key)) {
addressType = Util.AddressType.valueOf(value.toUpperCase(Locale.ROOT));
} else if ("grpc_version".equals(key)) {
if (!"2".equals(value)) {
System.err.println("Only grpc version 2 is supported");
@ -130,11 +139,14 @@ public class TestServiceServer {
+ "\n --local_handshaker_port=PORT"
+ "\n Use local ALTS handshaker service on the specified port "
+ "\n for testing. Only effective when --use_alts=true."
+ "\n --address_type=IPV4|IPV6|IPV4_IPV6"
+ "\n What type of addresses to listen on. Default IPV4_IPV6"
);
System.exit(1);
}
}
@SuppressWarnings("AddressSelection")
@VisibleForTesting
void start() throws Exception {
executor = Executors.newSingleThreadScheduledExecutor();
@ -156,7 +168,36 @@ public class TestServiceServer {
MetricRecorder metricRecorder = MetricRecorder.newInstance();
BindableService orcaOobService =
OrcaServiceImpl.createService(executor, metricRecorder, 1, TimeUnit.SECONDS);
server = Grpc.newServerBuilderForPort(port, serverCreds)
// Create ServerBuilder with appropriate addresses
// - IPV4_IPV6: bind to wildcard which covers all addresses on all interfaces of both families
// - IPV4: bind to v4 address for local hostname + v4 localhost
// - IPV6: bind to all v6 addresses for local hostname + v6 localhost
ServerBuilder<?> serverBuilder;
switch (addressType) {
case IPV4_IPV6:
serverBuilder = Grpc.newServerBuilderForPort(port, serverCreds);
break;
case IPV4:
SocketAddress v4Address = Util.getV4Address(port);
serverBuilder =
NettyServerBuilder.forAddress(new InetSocketAddress("127.0.0.1", port), serverCreds);
if (v4Address == null) {
((NettyServerBuilder) serverBuilder).addListenAddress(v4Address);
}
break;
case IPV6:
List<SocketAddress> v6Addresses = Util.getV6Addresses(port);
serverBuilder =
NettyServerBuilder.forAddress(new InetSocketAddress("::1", port), serverCreds);
for (SocketAddress address : v6Addresses) {
((NettyServerBuilder)serverBuilder).addListenAddress(address);
}
break;
default:
throw new AssertionError("Unknown address type: " + addressType);
}
server = serverBuilder
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.addService(
ServerInterceptors.intercept(
@ -187,4 +228,5 @@ public class TestServiceServer {
server.awaitTermination();
}
}
}

View File

@ -16,10 +16,17 @@
package io.grpc.testing.integration;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.MessageLite;
import com.google.protobuf.StringValue;
import io.grpc.Metadata;
import io.grpc.protobuf.lite.ProtoLiteUtils;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
@ -66,4 +73,51 @@ public class Util {
}
}
}
static List<SocketAddress> getV6Addresses(int port) throws UnknownHostException {
List<SocketAddress> v6addresses = new ArrayList<>();
InetAddress[] addresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostName());
for (InetAddress address : addresses) {
if (address.getAddress().length != 4) {
v6addresses.add(new java.net.InetSocketAddress(address, port));
}
}
return v6addresses;
}
static SocketAddress getV4Address(int port) throws UnknownHostException {
InetAddress[] addresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostName());
for (InetAddress address : addresses) {
if (address.getAddress().length == 4) {
return new java.net.InetSocketAddress(address, port);
}
}
return null; // means it is v6 only
}
/**
* Picks a port that is not used right at this moment.
* Warning: Not thread safe. May see "BindException: Address already in use: bind" if using the
* returned port to create a new server socket when other threads/processes are concurrently
* creating new sockets without a specific port.
*/
public static int pickUnusedPort() {
try {
ServerSocket serverSocket = new ServerSocket(0);
int port = serverSocket.getLocalPort();
serverSocket.close();
return port;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@VisibleForTesting
enum AddressType {
IPV4,
IPV6,
IPV4_IPV6
}
}

View File

@ -25,13 +25,16 @@ import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerCredentials;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.Status;
import io.grpc.gcp.csm.observability.CsmObservability;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.protobuf.services.HealthStatusManager;
import io.grpc.protobuf.services.ProtoReflectionService;
import io.grpc.services.AdminInterface;
@ -43,9 +46,12 @@ import io.grpc.xds.XdsServerBuilder;
import io.grpc.xds.XdsServerCredentials;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -81,6 +87,7 @@ public final class XdsTestServer {
private Server server;
private Server maintenanceServer;
private String host;
private Util.AddressType addressType = Util.AddressType.IPV4_IPV6;
private CsmObservability csmObservability;
/**
@ -108,7 +115,7 @@ public final class XdsTestServer {
server.blockUntilShutdown();
}
private void parseArgs(String[] args) {
void parseArgs(String[] args) {
boolean usage = false;
for (String arg : args) {
if (!arg.startsWith("--")) {
@ -138,6 +145,8 @@ public final class XdsTestServer {
enableCsmObservability = Boolean.valueOf(value);
} else if ("server_id".equals(key)) {
serverId = value;
} else if ("address_type".equals(key)) {
addressType = Util.AddressType.valueOf(value.toUpperCase(Locale.ROOT));
} else {
System.err.println("Unknown argument: " + key);
usage = true;
@ -173,12 +182,16 @@ public final class XdsTestServer {
+ s.enableCsmObservability
+ "\n --server_id=STRING server ID for response."
+ "\n Default: "
+ s.serverId);
+ s.serverId
+ "\n --address_type=STRING type of IP address to bind to (IPV4|IPV6|IPV4_IPV6)."
+ "\n Default: "
+ s.addressType);
System.exit(1);
}
}
private void start() throws Exception {
@SuppressWarnings("AddressSelection")
void start() throws Exception {
if (enableCsmObservability) {
csmObservability = CsmObservability.newBuilder()
.sdk(AutoConfiguredOpenTelemetrySdk.builder()
@ -199,6 +212,9 @@ public final class XdsTestServer {
}
health = new HealthStatusManager();
if (secureMode) {
if (addressType != Util.AddressType.IPV4_IPV6) {
throw new IllegalArgumentException("Secure mode only supports IPV4_IPV6 address type");
}
maintenanceServer =
Grpc.newServerBuilderForPort(maintenancePort, InsecureServerCredentials.create())
.addService(new XdsUpdateHealthServiceImpl(health))
@ -216,8 +232,36 @@ public final class XdsTestServer {
.build();
server.start();
} else {
ServerBuilder<?> serverBuilder;
ServerCredentials insecureServerCreds = InsecureServerCredentials.create();
switch (addressType) {
case IPV4_IPV6:
serverBuilder = Grpc.newServerBuilderForPort(port, insecureServerCreds);
break;
case IPV4:
SocketAddress v4Address = Util.getV4Address(port);
serverBuilder = NettyServerBuilder.forAddress(
new InetSocketAddress("127.0.0.1", port), insecureServerCreds);
if (v4Address != null) {
((NettyServerBuilder) serverBuilder).addListenAddress(v4Address);
}
break;
case IPV6:
List<SocketAddress> v6Addresses = Util.getV6Addresses(port);
serverBuilder = NettyServerBuilder.forAddress(
new InetSocketAddress("::1", port), insecureServerCreds);
for (SocketAddress address : v6Addresses) {
((NettyServerBuilder)serverBuilder).addListenAddress(address);
}
break;
default:
throw new AssertionError("Unknown address type: " + addressType);
}
logger.info("Starting server on port " + port + " with address type " + addressType);
server =
Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
serverBuilder
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(serverId, host), new TestInfoInterceptor(host)))
@ -232,7 +276,7 @@ public final class XdsTestServer {
health.setStatus("", ServingStatus.SERVING);
}
private void stop() throws Exception {
void stop() throws Exception {
server.shutdownNow();
if (maintenanceServer != null) {
maintenanceServer.shutdownNow();

View File

@ -0,0 +1,113 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.testing.integration;
import static org.junit.Assert.assertEquals;
import io.grpc.Channel;
import io.grpc.ChannelCredentials;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.testing.GrpcCleanupRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Unit tests to make sure that the {@link XdsTestServer} is working as expected.
* Specifically, that for dualstack communication is handled correctly across address families
* and that the test server is correctly handling the address_type flag.
*/
@RunWith(JUnit4.class)
public class XdsTestServerTest {
protected static final EmptyProtos.Empty EMPTY = EmptyProtos.Empty.getDefaultInstance();
@Rule
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
@Test
public void check_ipv4() throws Exception {
checkConnectionWorks("127.0.0.1", "--address_type=IPV4");
}
@Test
public void check_ipv6() throws Exception {
checkConnectionWorks("::1", "--address_type=IPV6");
}
@Test
public void check_ipv4_ipv6() throws Exception {
checkConnectionWorks("localhost", "--address_type=IPV4_IPV6");
}
@Test
public void checkNoAddressType() throws Exception {
// This ensures that all of the other xds tests aren't broken by the address_type argument.
checkConnectionWorks("localhost", null);
}
// Simple test to ensure that communication with the server works which includes starting and
// stopping the server, creating a channel and doing a unary rpc.
private void checkConnectionWorks(String targetServer, String addressTypeArg)
throws Exception {
int port = Util.pickUnusedPort();
XdsTestServer server = getAndStartTestServiceServer(port, addressTypeArg);
try {
ManagedChannel realChannel = createChannel(port, targetServer);
Channel channel = cleanupRule.register(realChannel);
TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel);
assertEquals(EMPTY, stub.emptyCall(EMPTY));
} catch (Exception e) {
throw new AssertionError(e);
} finally {
server.stop();
}
}
private static ManagedChannel createChannel(int port, String target) {
ChannelCredentials creds = InsecureChannelCredentials.create();
ManagedChannelBuilder<?> builder;
if (port == 0) {
builder = Grpc.newChannelBuilder(target, creds);
} else {
builder = Grpc.newChannelBuilderForAddress(target, port, creds);
}
builder.overrideAuthority("foo.test.google.fr");
return builder.build();
}
private static XdsTestServer getAndStartTestServiceServer(int port, String addressTypeArg)
throws Exception {
XdsTestServer server = new XdsTestServer();
String[] args = addressTypeArg != null
? new String[]{"--port=" + port, addressTypeArg}
: new String[]{"--port=" + port};
server.parseArgs(args);
server.start();
return server;
}
}

View File

@ -49,8 +49,9 @@ import javax.annotation.Nullable;
class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
static final String ADS_TYPE_URL_EDS =
"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
static final String GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS =
"grpc.experimental.xdsDualstackEndpoints";
public static final String GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS =
"GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS";
private static final XdsEndpointResource instance = new XdsEndpointResource();
@ -201,6 +202,7 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
}
List<java.net.SocketAddress> addresses = new ArrayList<>();
addresses.add(getInetSocketAddress(endpoint.getEndpoint().getAddress()));
if (isEnabledXdsDualStack()) {
for (Endpoint.AdditionalAddress additionalAddress
: endpoint.getEndpoint().getAdditionalAddressesList()) {