interop-testing: Move interop tests only used by test client to the test client

This removes the auth dependency and the implicit xds/orca from
AbstractInteropTest for things that weren't used in all but one case.
This commit is contained in:
Eric Anderson 2024-04-02 11:41:04 -07:00
parent 2fe1a13cd0
commit 6c9f92a725
4 changed files with 346 additions and 326 deletions

View File

@ -69,7 +69,6 @@ dependencies {
implementation project(':grpc-android'),
project(':grpc-core'),
project(':grpc-auth'),
project(':grpc-census'),
project(':grpc-okhttp'),
project(':grpc-protobuf-lite'),
@ -81,10 +80,6 @@ dependencies {
libraries.androidx.test.rules,
libraries.opencensus.contrib.grpc.metrics
implementation (libraries.google.auth.oauth2Http) {
exclude group: 'org.apache.httpcomponents'
}
implementation (project(':grpc-services')) {
exclude group: 'com.google.protobuf'
exclude group: 'com.google.guava'

View File

@ -21,18 +21,12 @@ import static com.google.common.truth.Truth.assertWithMessage;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.ComputeEngineCredentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
@ -45,7 +39,6 @@ import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ClientStreamTracer;
import io.grpc.Context;
import io.grpc.Grpc;
@ -62,7 +55,6 @@ import io.grpc.ServerInterceptors;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.census.InternalCensusStatsAccessor;
import io.grpc.census.internal.DeprecatedCensusConstants;
import io.grpc.internal.GrpcUtil;
@ -77,7 +69,6 @@ import io.grpc.internal.testing.TestClientStreamTracer;
import io.grpc.internal.testing.TestServerStreamTracer;
import io.grpc.internal.testing.TestStreamTracer;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.TestUtils;
@ -92,7 +83,6 @@ import io.grpc.testing.integration.Messages.StreamingInputCallRequest;
import io.grpc.testing.integration.Messages.StreamingInputCallResponse;
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import io.grpc.testing.integration.Messages.TestOrcaReport;
import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants;
import io.opencensus.stats.Measure;
import io.opencensus.stats.Measure.MeasureDouble;
@ -118,7 +108,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
@ -191,11 +180,6 @@ public abstract class AbstractInteropTest {
private final LinkedBlockingQueue<ServerStreamTracerInfo> serverStreamTracers =
new LinkedBlockingQueue<>();
static final CallOptions.Key<AtomicReference<TestOrcaReport>>
ORCA_RPC_REPORT_KEY = CallOptions.Key.create("orca-rpc-report");
static final CallOptions.Key<AtomicReference<TestOrcaReport>>
ORCA_OOB_REPORT_KEY = CallOptions.Key.create("orca-oob-report");
private static final class ServerStreamTracerInfo {
final String fullMethodName;
final InteropServerStreamTracer tracer;
@ -451,47 +435,6 @@ public abstract class AbstractInteropTest {
assertEquals(EMPTY, TestServiceGrpc.newBlockingStub(channel).emptyCall(EMPTY));
}
/** Sends a cacheable unary rpc using GET. Requires that the server is behind a caching proxy. */
public void cacheableUnary() {
// THIS TEST IS BROKEN. Enabling safe just on the MethodDescriptor does nothing by itself. This
// test would need to enable GET on the channel.
// Set safe to true.
MethodDescriptor<SimpleRequest, SimpleResponse> safeCacheableUnaryCallMethod =
TestServiceGrpc.getCacheableUnaryCallMethod().toBuilder().setSafe(true).build();
// Set fake user IP since some proxies (GFE) won't cache requests from localhost.
Metadata.Key<String> userIpKey = Metadata.Key.of("x-user-ip", Metadata.ASCII_STRING_MARSHALLER);
Metadata metadata = new Metadata();
metadata.put(userIpKey, "1.2.3.4");
Channel channelWithUserIpKey =
ClientInterceptors.intercept(channel, MetadataUtils.newAttachHeadersInterceptor(metadata));
SimpleRequest requests1And2 =
SimpleRequest.newBuilder()
.setPayload(
Payload.newBuilder()
.setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime()))))
.build();
SimpleRequest request3 =
SimpleRequest.newBuilder()
.setPayload(
Payload.newBuilder()
.setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime()))))
.build();
SimpleResponse response1 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2);
SimpleResponse response2 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2);
SimpleResponse response3 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, request3);
assertEquals(response1, response2);
assertNotEquals(response1, response3);
// THIS TEST IS BROKEN. See comment at start of method.
}
@Test
public void largeUnary() throws Exception {
assumeEnoughMemory();
@ -603,26 +546,6 @@ public abstract class AbstractInteropTest {
Collections.singleton(goldenResponse));
}
/**
* Assuming "pick_first" policy is used, tests that all requests are sent to the same server.
*/
public void pickFirstUnary() throws Exception {
SimpleRequest request = SimpleRequest.newBuilder()
.setResponseSize(1)
.setFillServerId(true)
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[1])))
.build();
SimpleResponse firstResponse = blockingStub.unaryCall(request);
// Increase the chance of all servers are connected, in case the channel should be doing
// round_robin instead.
Thread.sleep(5000);
for (int i = 0; i < 100; i++) {
SimpleResponse response = blockingStub.unaryCall(request);
assertThat(response.getServerId()).isEqualTo(firstResponse.getServerId());
}
}
@Test
public void serverStreaming() throws Exception {
final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
@ -1757,247 +1680,6 @@ public abstract class AbstractInteropTest {
assertNotNull(obtainLocalClientAddr());
}
/**
* Test backend metrics per query reporting: expect the test client LB policy to receive load
* reports.
*/
public void testOrcaPerRpc() throws Exception {
AtomicReference<TestOrcaReport> reportHolder = new AtomicReference<>();
TestOrcaReport answer = TestOrcaReport.newBuilder()
.setCpuUtilization(0.8210)
.setMemoryUtilization(0.5847)
.putRequestCost("cost", 3456.32)
.putUtilization("util", 0.30499)
.build();
blockingStub.withOption(ORCA_RPC_REPORT_KEY, reportHolder).unaryCall(
SimpleRequest.newBuilder().setOrcaPerQueryReport(answer).build());
assertThat(reportHolder.get()).isEqualTo(answer);
}
/**
* Test backend metrics OOB reporting: expect the test client LB policy to receive load reports.
*/
public void testOrcaOob() throws Exception {
AtomicReference<TestOrcaReport> reportHolder = new AtomicReference<>();
final TestOrcaReport answer = TestOrcaReport.newBuilder()
.setCpuUtilization(0.8210)
.setMemoryUtilization(0.5847)
.putUtilization("util", 0.30499)
.build();
final TestOrcaReport answer2 = TestOrcaReport.newBuilder()
.setCpuUtilization(0.29309)
.setMemoryUtilization(0.2)
.putUtilization("util", 0.2039)
.build();
final int retryLimit = 5;
BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
final Object lastItem = new Object();
StreamObserver<StreamingOutputCallRequest> streamObserver =
asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() {
@Override
public void onNext(StreamingOutputCallResponse value) {
queue.add(value);
}
@Override
public void onError(Throwable t) {
queue.add(t);
}
@Override
public void onCompleted() {
queue.add(lastItem);
}
});
streamObserver.onNext(StreamingOutputCallRequest.newBuilder()
.setOrcaOobReport(answer)
.addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class);
int i = 0;
for (; i < retryLimit; i++) {
Thread.sleep(1000);
blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY);
if (answer.equals(reportHolder.get())) {
break;
}
}
assertThat(i).isLessThan(retryLimit);
streamObserver.onNext(StreamingOutputCallRequest.newBuilder()
.setOrcaOobReport(answer2)
.addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class);
for (i = 0; i < retryLimit; i++) {
Thread.sleep(1000);
blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY);
if (reportHolder.get().equals(answer2)) {
break;
}
}
assertThat(i).isLessThan(retryLimit);
streamObserver.onCompleted();
assertThat(queue.take()).isSameInstanceAs(lastItem);
}
/** Sends a large unary rpc with service account credentials. */
public void serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope)
throws Exception {
// cast to ServiceAccountCredentials to double-check the right type of object was created.
GoogleCredentials credentials =
ServiceAccountCredentials.class.cast(GoogleCredentials.fromStream(credentialsStream));
credentials = credentials.createScoped(Arrays.asList(authScope));
TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
.withCallCredentials(MoreCallCredentials.from(credentials));
final SimpleRequest request = SimpleRequest.newBuilder()
.setFillUsername(true)
.setFillOauthScope(true)
.setResponseSize(314159)
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[271828])))
.build();
final SimpleResponse response = stub.unaryCall(request);
assertFalse(response.getUsername().isEmpty());
assertTrue("Received username: " + response.getUsername(),
jsonKey.contains(response.getUsername()));
assertFalse(response.getOauthScope().isEmpty());
assertTrue("Received oauth scope: " + response.getOauthScope(),
authScope.contains(response.getOauthScope()));
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
.setOauthScope(response.getOauthScope())
.setUsername(response.getUsername())
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[314159])))
.build();
assertResponse(goldenResponse, response);
}
/** Sends a large unary rpc with compute engine credentials. */
public void computeEngineCreds(String serviceAccount, String oauthScope) throws Exception {
ComputeEngineCredentials credentials = ComputeEngineCredentials.create();
TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
.withCallCredentials(MoreCallCredentials.from(credentials));
final SimpleRequest request = SimpleRequest.newBuilder()
.setFillUsername(true)
.setFillOauthScope(true)
.setResponseSize(314159)
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[271828])))
.build();
final SimpleResponse response = stub.unaryCall(request);
assertEquals(serviceAccount, response.getUsername());
assertFalse(response.getOauthScope().isEmpty());
assertTrue("Received oauth scope: " + response.getOauthScope(),
oauthScope.contains(response.getOauthScope()));
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
.setOauthScope(response.getOauthScope())
.setUsername(response.getUsername())
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[314159])))
.build();
assertResponse(goldenResponse, response);
}
/** Sends an unary rpc with ComputeEngineChannelBuilder. */
public void computeEngineChannelCredentials(
String defaultServiceAccount,
TestServiceGrpc.TestServiceBlockingStub computeEngineStub) throws Exception {
final SimpleRequest request = SimpleRequest.newBuilder()
.setFillUsername(true)
.setResponseSize(314159)
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[271828])))
.build();
final SimpleResponse response = computeEngineStub.unaryCall(request);
assertEquals(defaultServiceAccount, response.getUsername());
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
.setUsername(defaultServiceAccount)
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[314159])))
.build();
assertResponse(goldenResponse, response);
}
/** Test JWT-based auth. */
public void jwtTokenCreds(InputStream serviceAccountJson) throws Exception {
final SimpleRequest request = SimpleRequest.newBuilder()
.setResponseSize(314159)
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[271828])))
.setFillUsername(true)
.build();
ServiceAccountCredentials credentials = (ServiceAccountCredentials)
GoogleCredentials.fromStream(serviceAccountJson);
TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
.withCallCredentials(MoreCallCredentials.from(credentials));
SimpleResponse response = stub.unaryCall(request);
assertEquals(credentials.getClientEmail(), response.getUsername());
assertEquals(314159, response.getPayload().getBody().size());
}
/** Sends a unary rpc with raw oauth2 access token credentials. */
public void oauth2AuthToken(String jsonKey, InputStream credentialsStream, String authScope)
throws Exception {
GoogleCredentials utilCredentials =
GoogleCredentials.fromStream(credentialsStream);
utilCredentials = utilCredentials.createScoped(Arrays.asList(authScope));
AccessToken accessToken = utilCredentials.refreshAccessToken();
OAuth2Credentials credentials = OAuth2Credentials.create(accessToken);
TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
.withCallCredentials(MoreCallCredentials.from(credentials));
final SimpleRequest request = SimpleRequest.newBuilder()
.setFillUsername(true)
.setFillOauthScope(true)
.build();
final SimpleResponse response = stub.unaryCall(request);
assertFalse(response.getUsername().isEmpty());
assertTrue("Received username: " + response.getUsername(),
jsonKey.contains(response.getUsername()));
assertFalse(response.getOauthScope().isEmpty());
assertTrue("Received oauth scope: " + response.getOauthScope(),
authScope.contains(response.getOauthScope()));
}
/** Sends a unary rpc with "per rpc" raw oauth2 access token credentials. */
public void perRpcCreds(String jsonKey, InputStream credentialsStream, String oauthScope)
throws Exception {
// In gRpc Java, we don't have per Rpc credentials, user can use an intercepted stub only once
// for that purpose.
// So, this test is identical to oauth2_auth_token test.
oauth2AuthToken(jsonKey, credentialsStream, oauthScope);
}
/** Sends an unary rpc with "google default credentials". */
public void googleDefaultCredentials(
String defaultServiceAccount,
TestServiceGrpc.TestServiceBlockingStub googleDefaultStub) throws Exception {
final SimpleRequest request = SimpleRequest.newBuilder()
.setFillUsername(true)
.setResponseSize(314159)
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[271828])))
.build();
final SimpleResponse response = googleDefaultStub.unaryCall(request);
assertEquals(defaultServiceAccount, response.getUsername());
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
.setUsername(defaultServiceAccount)
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[314159])))
.build();
assertResponse(goldenResponse, response);
}
private static class SoakIterationResult {
public SoakIterationResult(long latencyMs, Status status) {
this.latencyMs = latencyMs;
@ -2481,7 +2163,7 @@ public abstract class AbstractInteropTest {
}
}
private void assertResponse(SimpleResponse expected, SimpleResponse actual) {
public void assertResponse(SimpleResponse expected, SimpleResponse actual) {
assertPayload(expected.getPayload(), actual.getPayload());
assertEquals(expected.getUsername(), actual.getUsername());
assertEquals(expected.getOauthScope(), actual.getOauthScope());

View File

@ -16,8 +16,8 @@
package io.grpc.testing.integration;
import static io.grpc.testing.integration.AbstractInteropTest.ORCA_OOB_REPORT_KEY;
import static io.grpc.testing.integration.AbstractInteropTest.ORCA_RPC_REPORT_KEY;
import static io.grpc.testing.integration.TestServiceClient.ORCA_OOB_REPORT_KEY;
import static io.grpc.testing.integration.TestServiceClient.ORCA_RPC_REPORT_KEY;
import io.grpc.ConnectivityState;
import io.grpc.LoadBalancer;

View File

@ -16,10 +16,25 @@
package io.grpc.testing.integration;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.ComputeEngineCredentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ChannelCredentials;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.InsecureServerCredentials;
@ -28,11 +43,13 @@ import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerBuilder;
import io.grpc.TlsChannelCredentials;
import io.grpc.alts.AltsChannelCredentials;
import io.grpc.alts.ComputeEngineChannelCredentials;
import io.grpc.alts.GoogleDefaultChannelCredentials;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.JsonParser;
import io.grpc.netty.InsecureFromHttp1ChannelCredentials;
@ -40,13 +57,27 @@ import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.okhttp.InternalOkHttpChannelBuilder;
import io.grpc.okhttp.OkHttpChannelBuilder;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.TlsTesting;
import io.grpc.testing.integration.Messages.Payload;
import io.grpc.testing.integration.Messages.ResponseParameters;
import io.grpc.testing.integration.Messages.SimpleRequest;
import io.grpc.testing.integration.Messages.SimpleResponse;
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import io.grpc.testing.integration.Messages.TestOrcaReport;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
/**
@ -57,6 +88,11 @@ public class TestServiceClient {
private static final Charset UTF_8 = Charset.forName("UTF-8");
static final CallOptions.Key<AtomicReference<TestOrcaReport>>
ORCA_RPC_REPORT_KEY = CallOptions.Key.create("orca-rpc-report");
static final CallOptions.Key<AtomicReference<TestOrcaReport>>
ORCA_OOB_REPORT_KEY = CallOptions.Key.create("orca-oob-report");
/**
* The main application allowing this client to be launched from the command line.
*/
@ -668,6 +704,313 @@ public class TestServiceClient {
return okBuilder.intercept(createCensusStatsClientInterceptor());
}
/**
* Assuming "pick_first" policy is used, tests that all requests are sent to the same server.
*/
public void pickFirstUnary() throws Exception {
SimpleRequest request = SimpleRequest.newBuilder()
.setResponseSize(1)
.setFillServerId(true)
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[1])))
.build();
SimpleResponse firstResponse = blockingStub.unaryCall(request);
// Increase the chance of all servers are connected, in case the channel should be doing
// round_robin instead.
Thread.sleep(5000);
for (int i = 0; i < 100; i++) {
SimpleResponse response = blockingStub.unaryCall(request);
assertThat(response.getServerId()).isEqualTo(firstResponse.getServerId());
}
}
/**
* Sends a cacheable unary rpc using GET. Requires that the server is behind a caching proxy.
*/
public void cacheableUnary() {
// THIS TEST IS BROKEN. Enabling safe just on the MethodDescriptor does nothing by itself.
// This test would need to enable GET on the channel.
// Set safe to true.
MethodDescriptor<SimpleRequest, SimpleResponse> safeCacheableUnaryCallMethod =
TestServiceGrpc.getCacheableUnaryCallMethod().toBuilder().setSafe(true).build();
// Set fake user IP since some proxies (GFE) won't cache requests from localhost.
Metadata.Key<String> userIpKey =
Metadata.Key.of("x-user-ip", Metadata.ASCII_STRING_MARSHALLER);
Metadata metadata = new Metadata();
metadata.put(userIpKey, "1.2.3.4");
Channel channelWithUserIpKey = ClientInterceptors.intercept(
channel, MetadataUtils.newAttachHeadersInterceptor(metadata));
SimpleRequest requests1And2 =
SimpleRequest.newBuilder()
.setPayload(
Payload.newBuilder()
.setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime()))))
.build();
SimpleRequest request3 =
SimpleRequest.newBuilder()
.setPayload(
Payload.newBuilder()
.setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime()))))
.build();
SimpleResponse response1 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT,
requests1And2);
SimpleResponse response2 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT,
requests1And2);
SimpleResponse response3 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, request3);
assertEquals(response1, response2);
assertNotEquals(response1, response3);
// THIS TEST IS BROKEN. See comment at start of method.
}
/** Sends a large unary rpc with service account credentials. */
public void serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope)
throws Exception {
// cast to ServiceAccountCredentials to double-check the right type of object was created.
GoogleCredentials credentials =
ServiceAccountCredentials.class.cast(GoogleCredentials.fromStream(credentialsStream));
credentials = credentials.createScoped(Arrays.asList(authScope));
TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
.withCallCredentials(MoreCallCredentials.from(credentials));
final SimpleRequest request = SimpleRequest.newBuilder()
.setFillUsername(true)
.setFillOauthScope(true)
.setResponseSize(314159)
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[271828])))
.build();
final SimpleResponse response = stub.unaryCall(request);
assertFalse(response.getUsername().isEmpty());
assertTrue("Received username: " + response.getUsername(),
jsonKey.contains(response.getUsername()));
assertFalse(response.getOauthScope().isEmpty());
assertTrue("Received oauth scope: " + response.getOauthScope(),
authScope.contains(response.getOauthScope()));
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
.setOauthScope(response.getOauthScope())
.setUsername(response.getUsername())
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[314159])))
.build();
assertResponse(goldenResponse, response);
}
/** Sends a large unary rpc with compute engine credentials. */
public void computeEngineCreds(String serviceAccount, String oauthScope) throws Exception {
ComputeEngineCredentials credentials = ComputeEngineCredentials.create();
TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
.withCallCredentials(MoreCallCredentials.from(credentials));
final SimpleRequest request = SimpleRequest.newBuilder()
.setFillUsername(true)
.setFillOauthScope(true)
.setResponseSize(314159)
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[271828])))
.build();
final SimpleResponse response = stub.unaryCall(request);
assertEquals(serviceAccount, response.getUsername());
assertFalse(response.getOauthScope().isEmpty());
assertTrue("Received oauth scope: " + response.getOauthScope(),
oauthScope.contains(response.getOauthScope()));
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
.setOauthScope(response.getOauthScope())
.setUsername(response.getUsername())
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[314159])))
.build();
assertResponse(goldenResponse, response);
}
/** Sends an unary rpc with ComputeEngineChannelBuilder. */
public void computeEngineChannelCredentials(
String defaultServiceAccount,
TestServiceGrpc.TestServiceBlockingStub computeEngineStub) throws Exception {
final SimpleRequest request = SimpleRequest.newBuilder()
.setFillUsername(true)
.setResponseSize(314159)
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[271828])))
.build();
final SimpleResponse response = computeEngineStub.unaryCall(request);
assertEquals(defaultServiceAccount, response.getUsername());
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
.setUsername(defaultServiceAccount)
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[314159])))
.build();
assertResponse(goldenResponse, response);
}
/** Test JWT-based auth. */
public void jwtTokenCreds(InputStream serviceAccountJson) throws Exception {
final SimpleRequest request = SimpleRequest.newBuilder()
.setResponseSize(314159)
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[271828])))
.setFillUsername(true)
.build();
ServiceAccountCredentials credentials = (ServiceAccountCredentials)
GoogleCredentials.fromStream(serviceAccountJson);
TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
.withCallCredentials(MoreCallCredentials.from(credentials));
SimpleResponse response = stub.unaryCall(request);
assertEquals(credentials.getClientEmail(), response.getUsername());
assertEquals(314159, response.getPayload().getBody().size());
}
/** Sends a unary rpc with raw oauth2 access token credentials. */
public void oauth2AuthToken(String jsonKey, InputStream credentialsStream, String authScope)
throws Exception {
GoogleCredentials utilCredentials =
GoogleCredentials.fromStream(credentialsStream);
utilCredentials = utilCredentials.createScoped(Arrays.asList(authScope));
AccessToken accessToken = utilCredentials.refreshAccessToken();
OAuth2Credentials credentials = OAuth2Credentials.create(accessToken);
TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
.withCallCredentials(MoreCallCredentials.from(credentials));
final SimpleRequest request = SimpleRequest.newBuilder()
.setFillUsername(true)
.setFillOauthScope(true)
.build();
final SimpleResponse response = stub.unaryCall(request);
assertFalse(response.getUsername().isEmpty());
assertTrue("Received username: " + response.getUsername(),
jsonKey.contains(response.getUsername()));
assertFalse(response.getOauthScope().isEmpty());
assertTrue("Received oauth scope: " + response.getOauthScope(),
authScope.contains(response.getOauthScope()));
}
/** Sends a unary rpc with "per rpc" raw oauth2 access token credentials. */
public void perRpcCreds(String jsonKey, InputStream credentialsStream, String oauthScope)
throws Exception {
// In gRpc Java, we don't have per Rpc credentials, user can use an intercepted stub only once
// for that purpose.
// So, this test is identical to oauth2_auth_token test.
oauth2AuthToken(jsonKey, credentialsStream, oauthScope);
}
/** Sends an unary rpc with "google default credentials". */
public void googleDefaultCredentials(
String defaultServiceAccount,
TestServiceGrpc.TestServiceBlockingStub googleDefaultStub) throws Exception {
final SimpleRequest request = SimpleRequest.newBuilder()
.setFillUsername(true)
.setResponseSize(314159)
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[271828])))
.build();
final SimpleResponse response = googleDefaultStub.unaryCall(request);
assertEquals(defaultServiceAccount, response.getUsername());
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
.setUsername(defaultServiceAccount)
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[314159])))
.build();
assertResponse(goldenResponse, response);
}
/**
* Test backend metrics per query reporting: expect the test client LB policy to receive load
* reports.
*/
public void testOrcaPerRpc() throws Exception {
AtomicReference<TestOrcaReport> reportHolder = new AtomicReference<>();
TestOrcaReport answer = TestOrcaReport.newBuilder()
.setCpuUtilization(0.8210)
.setMemoryUtilization(0.5847)
.putRequestCost("cost", 3456.32)
.putUtilization("util", 0.30499)
.build();
blockingStub.withOption(ORCA_RPC_REPORT_KEY, reportHolder).unaryCall(
SimpleRequest.newBuilder().setOrcaPerQueryReport(answer).build());
assertThat(reportHolder.get()).isEqualTo(answer);
}
/**
* Test backend metrics OOB reporting: expect the test client LB policy to receive load reports.
*/
public void testOrcaOob() throws Exception {
AtomicReference<TestOrcaReport> reportHolder = new AtomicReference<>();
final TestOrcaReport answer = TestOrcaReport.newBuilder()
.setCpuUtilization(0.8210)
.setMemoryUtilization(0.5847)
.putUtilization("util", 0.30499)
.build();
final TestOrcaReport answer2 = TestOrcaReport.newBuilder()
.setCpuUtilization(0.29309)
.setMemoryUtilization(0.2)
.putUtilization("util", 0.2039)
.build();
final int retryLimit = 5;
BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
final Object lastItem = new Object();
StreamObserver<StreamingOutputCallRequest> streamObserver =
asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() {
@Override
public void onNext(StreamingOutputCallResponse value) {
queue.add(value);
}
@Override
public void onError(Throwable t) {
queue.add(t);
}
@Override
public void onCompleted() {
queue.add(lastItem);
}
});
streamObserver.onNext(StreamingOutputCallRequest.newBuilder()
.setOrcaOobReport(answer)
.addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class);
int i = 0;
for (; i < retryLimit; i++) {
Thread.sleep(1000);
blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY);
if (answer.equals(reportHolder.get())) {
break;
}
}
assertThat(i).isLessThan(retryLimit);
streamObserver.onNext(StreamingOutputCallRequest.newBuilder()
.setOrcaOobReport(answer2)
.addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class);
for (i = 0; i < retryLimit; i++) {
Thread.sleep(1000);
blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY);
if (reportHolder.get().equals(answer2)) {
break;
}
}
assertThat(i).isLessThan(retryLimit);
streamObserver.onCompleted();
assertThat(queue.take()).isSameInstanceAs(lastItem);
}
@Override
protected boolean metricsExpected() {
// Exact message size doesn't match when testing with Go servers: