mirror of https://github.com/grpc/grpc-java.git
rls: Add metric test with real channel
This commit is contained in:
parent
6bede04d9f
commit
54ac06ae30
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright 2024 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* A MetricSink that discards all records.
|
||||
*/
|
||||
public class NoopMetricSink implements MetricSink {
|
||||
private int size;
|
||||
|
||||
@Override
|
||||
public Map<String, Boolean> getEnabledMetrics() {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getOptionalLabels() {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getMeasuresSize() {
|
||||
return size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void updateMeasures(List<MetricInstrument> instruments) {
|
||||
size = instruments.size();
|
||||
}
|
||||
}
|
|
@ -28,6 +28,7 @@ dependencies {
|
|||
project(':grpc-inprocess'),
|
||||
project(':grpc-testing'),
|
||||
project(':grpc-testing-proto'),
|
||||
testFixtures(project(':grpc-api')),
|
||||
testFixtures(project(':grpc-core'))
|
||||
signature libraries.signature.java
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package io.grpc.rls;
|
|||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static org.mockito.AdditionalAnswers.delegatesTo;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
|
@ -50,15 +51,18 @@ import io.grpc.LoadBalancer.ResolvedAddresses;
|
|||
import io.grpc.LoadBalancer.Subchannel;
|
||||
import io.grpc.LoadBalancer.SubchannelPicker;
|
||||
import io.grpc.LoadBalancer.SubchannelStateListener;
|
||||
import io.grpc.LongCounterMetricInstrument;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.MethodDescriptor.Marshaller;
|
||||
import io.grpc.MethodDescriptor.MethodType;
|
||||
import io.grpc.MetricInstrument;
|
||||
import io.grpc.MetricRecorder;
|
||||
import io.grpc.MetricSink;
|
||||
import io.grpc.NameResolver.ConfigOrError;
|
||||
import io.grpc.NoopMetricSink;
|
||||
import io.grpc.ServerCall;
|
||||
import io.grpc.ServerServiceDefinition;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.Status.Code;
|
||||
import io.grpc.SynchronizationContext;
|
||||
|
@ -67,16 +71,20 @@ import io.grpc.inprocess.InProcessServerBuilder;
|
|||
import io.grpc.internal.FakeClock;
|
||||
import io.grpc.internal.JsonParser;
|
||||
import io.grpc.internal.PickSubchannelArgsImpl;
|
||||
import io.grpc.internal.testing.StreamRecorder;
|
||||
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
|
||||
import io.grpc.rls.RlsLoadBalancer.CachingRlsLbClientBuilderProvider;
|
||||
import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
|
||||
import io.grpc.rls.RlsProtoData.RouteLookupRequest;
|
||||
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
|
||||
import io.grpc.stub.ClientCalls;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import io.grpc.testing.GrpcCleanupRule;
|
||||
import io.grpc.testing.TestMethodDescriptors;
|
||||
import java.io.IOException;
|
||||
import java.lang.Thread.UncaughtExceptionHandler;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Deque;
|
||||
import java.util.LinkedList;
|
||||
|
@ -125,12 +133,10 @@ public class RlsLoadBalancerTest {
|
|||
private final Deque<FakeSubchannel> subchannels = new LinkedList<>();
|
||||
private final FakeThrottler fakeThrottler = new FakeThrottler();
|
||||
private final String channelTarget = "channelTarget";
|
||||
@Mock
|
||||
private Marshaller<Object> mockMarshaller;
|
||||
@Captor
|
||||
private ArgumentCaptor<SubchannelPicker> pickerCaptor;
|
||||
private MethodDescriptor<Object, Object> fakeSearchMethod;
|
||||
private MethodDescriptor<Object, Object> fakeRescueMethod;
|
||||
private MethodDescriptor<Void, Void> fakeSearchMethod;
|
||||
private MethodDescriptor<Void, Void> fakeRescueMethod;
|
||||
private RlsLoadBalancer rlsLb;
|
||||
private String defaultTarget = "defaultTarget";
|
||||
private PickSubchannelArgs searchSubchannelArgs;
|
||||
|
@ -139,17 +145,17 @@ public class RlsLoadBalancerTest {
|
|||
@Before
|
||||
public void setUp() {
|
||||
fakeSearchMethod =
|
||||
MethodDescriptor.newBuilder()
|
||||
MethodDescriptor.<Void, Void>newBuilder()
|
||||
.setFullMethodName("com.google/Search")
|
||||
.setRequestMarshaller(mockMarshaller)
|
||||
.setResponseMarshaller(mockMarshaller)
|
||||
.setRequestMarshaller(TestMethodDescriptors.voidMarshaller())
|
||||
.setResponseMarshaller(TestMethodDescriptors.voidMarshaller())
|
||||
.setType(MethodType.CLIENT_STREAMING)
|
||||
.build();
|
||||
fakeRescueMethod =
|
||||
MethodDescriptor.newBuilder()
|
||||
MethodDescriptor.<Void, Void>newBuilder()
|
||||
.setFullMethodName("com.google/Rescue")
|
||||
.setRequestMarshaller(mockMarshaller)
|
||||
.setResponseMarshaller(mockMarshaller)
|
||||
.setRequestMarshaller(TestMethodDescriptors.voidMarshaller())
|
||||
.setResponseMarshaller(TestMethodDescriptors.voidMarshaller())
|
||||
.setType(MethodType.UNARY)
|
||||
.build();
|
||||
fakeRlsServerImpl.setLookupTable(
|
||||
|
@ -282,6 +288,44 @@ public class RlsLoadBalancerTest {
|
|||
verifyNoMoreInteractions(mockMetricRecorder);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void metricsWithRealChannel() throws Exception {
|
||||
grpcCleanupRule.register(
|
||||
InProcessServerBuilder.forName("fake-bigtable.googleapis.com")
|
||||
.addService(ServerServiceDefinition.builder("com.google")
|
||||
.addMethod(fakeSearchMethod, (call, headers) -> {
|
||||
call.sendHeaders(new Metadata());
|
||||
call.sendMessage(null);
|
||||
call.close(Status.OK, new Metadata());
|
||||
return new ServerCall.Listener<Void>() {};
|
||||
})
|
||||
.build())
|
||||
.directExecutor()
|
||||
.build()
|
||||
.start());
|
||||
MetricSink metrics = mock(MetricSink.class, delegatesTo(new NoopMetricSink()));
|
||||
ManagedChannel channel = grpcCleanupRule.register(
|
||||
InProcessChannelBuilder.forName("fake-bigtable.googleapis.com")
|
||||
.defaultServiceConfig(parseJson(getServiceConfigJsonStr()))
|
||||
.addMetricSink(metrics)
|
||||
.directExecutor()
|
||||
.build());
|
||||
|
||||
StreamRecorder<Void> recorder = StreamRecorder.create();
|
||||
StreamObserver<Void> requestObserver = ClientCalls.asyncClientStreamingCall(
|
||||
channel.newCall(fakeSearchMethod, CallOptions.DEFAULT), recorder);
|
||||
requestObserver.onCompleted();
|
||||
assertThat(recorder.awaitCompletion(10, TimeUnit.SECONDS)).isTrue();
|
||||
assertThat(recorder.getError()).isNull();
|
||||
|
||||
verify(metrics).addLongCounter(
|
||||
eqMetricInstrumentName("grpc.lb.rls.default_target_picks"),
|
||||
eq(1L),
|
||||
eq(Arrays.asList("directaddress:///fake-bigtable.googleapis.com", "localhost:8972",
|
||||
"defaultTarget", "complete")),
|
||||
eq(Arrays.asList()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void lb_working_withoutDefaultTarget_noRlsResponse() throws Exception {
|
||||
defaultTarget = "";
|
||||
|
@ -498,7 +542,7 @@ public class RlsLoadBalancerTest {
|
|||
|
||||
private void deliverResolvedAddresses() throws Exception {
|
||||
ConfigOrError parsedConfigOrError =
|
||||
provider.parseLoadBalancingPolicyConfig(getServiceConfig());
|
||||
provider.parseLoadBalancingPolicyConfig(parseJson(getLbConfigJsonStr()));
|
||||
assertThat(parsedConfigOrError.getConfig()).isNotNull();
|
||||
rlsLb.acceptResolvedAddresses(ResolvedAddresses.newBuilder()
|
||||
.setAddresses(ImmutableList.of(new EquivalentAddressGroup(mock(SocketAddress.class))))
|
||||
|
@ -508,13 +552,24 @@ public class RlsLoadBalancerTest {
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Map<String, Object> getServiceConfig() throws IOException {
|
||||
String serviceConfig = "{"
|
||||
private Map<String, Object> parseJson(String json) throws IOException {
|
||||
return (Map<String, Object>) JsonParser.parse(json);
|
||||
}
|
||||
|
||||
private String getServiceConfigJsonStr() {
|
||||
return "{"
|
||||
+ " \"loadBalancingConfig\": [{"
|
||||
+ " \"rls_experimental\": " + getLbConfigJsonStr()
|
||||
+ " }]"
|
||||
+ "}";
|
||||
}
|
||||
|
||||
private String getLbConfigJsonStr() {
|
||||
return "{"
|
||||
+ " \"routeLookupConfig\": " + getRlsConfigJsonStr() + ", "
|
||||
+ " \"childPolicy\": [{\"pick_first\": {}}],"
|
||||
+ " \"childPolicyConfigTargetFieldName\": \"serviceName\""
|
||||
+ "}";
|
||||
return (Map<String, Object>) JsonParser.parse(serviceConfig);
|
||||
}
|
||||
|
||||
private String getRlsConfigJsonStr() {
|
||||
|
@ -558,12 +613,7 @@ public class RlsLoadBalancerTest {
|
|||
String dataPlaneTargetLabel, String pickResult) {
|
||||
// TODO: support the "grpc.target" label once available.
|
||||
verify(mockMetricRecorder, times(times)).addLongCounter(
|
||||
argThat(new ArgumentMatcher<LongCounterMetricInstrument>() {
|
||||
@Override
|
||||
public boolean matches(LongCounterMetricInstrument longCounterInstrument) {
|
||||
return longCounterInstrument.getName().equals(name);
|
||||
}
|
||||
}), eq(value),
|
||||
eqMetricInstrumentName(name), eq(value),
|
||||
eq(Lists.newArrayList(channelTarget, "localhost:8972", dataPlaneTargetLabel, pickResult)),
|
||||
eq(Lists.newArrayList()));
|
||||
}
|
||||
|
@ -572,16 +622,21 @@ public class RlsLoadBalancerTest {
|
|||
private void verifyFailedPicksCounterAdd(int times, long value) {
|
||||
// TODO: support the "grpc.target" label once available.
|
||||
verify(mockMetricRecorder, times(times)).addLongCounter(
|
||||
argThat(new ArgumentMatcher<LongCounterMetricInstrument>() {
|
||||
@Override
|
||||
public boolean matches(LongCounterMetricInstrument longCounterInstrument) {
|
||||
return longCounterInstrument.getName().equals("grpc.lb.rls.failed_picks");
|
||||
}
|
||||
}), eq(value),
|
||||
eqMetricInstrumentName("grpc.lb.rls.failed_picks"), eq(value),
|
||||
eq(Lists.newArrayList(channelTarget, "localhost:8972")),
|
||||
eq(Lists.newArrayList()));
|
||||
}
|
||||
|
||||
@SuppressWarnings("TypeParameterUnusedInFormals")
|
||||
private <T extends MetricInstrument> T eqMetricInstrumentName(String name) {
|
||||
return argThat(new ArgumentMatcher<T>() {
|
||||
@Override
|
||||
public boolean matches(T instrument) {
|
||||
return instrument.getName().equals(name);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private PickSubchannelArgs newPickSubchannelArgs(MethodDescriptor<?, ?> method) {
|
||||
return new PickSubchannelArgsImpl(
|
||||
method, new Metadata(), CallOptions.DEFAULT, new PickDetailsConsumer() {});
|
||||
|
|
Loading…
Reference in New Issue