rls: Add metric test with real channel

This commit is contained in:
Eric Anderson 2024-05-07 07:55:58 -07:00
parent a639175c04
commit 3b6b1537d4
3 changed files with 133 additions and 28 deletions

View File

@ -0,0 +1,49 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* A MetricSink that discards all records.
*/
public class NoopMetricSink implements MetricSink {
private int size;
@Override
public Map<String, Boolean> getEnabledMetrics() {
return Collections.emptyMap();
}
@Override
public Set<String> getOptionalLabels() {
return Collections.emptySet();
}
@Override
public synchronized int getMeasuresSize() {
return size;
}
@Override
public synchronized void updateMeasures(List<MetricInstrument> instruments) {
size = instruments.size();
}
}

View File

@ -28,6 +28,7 @@ dependencies {
project(':grpc-inprocess'), project(':grpc-inprocess'),
project(':grpc-testing'), project(':grpc-testing'),
project(':grpc-testing-proto'), project(':grpc-testing-proto'),
testFixtures(project(':grpc-api')),
testFixtures(project(':grpc-core')) testFixtures(project(':grpc-core'))
signature libraries.signature.java signature libraries.signature.java
} }

View File

@ -18,6 +18,7 @@ package io.grpc.rls;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.argThat;
@ -50,15 +51,18 @@ import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener; import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.LongCounterMetricInstrument;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.MethodDescriptor.MethodType; import io.grpc.MethodDescriptor.MethodType;
import io.grpc.MetricInstrument;
import io.grpc.MetricRecorder; import io.grpc.MetricRecorder;
import io.grpc.MetricSink;
import io.grpc.NameResolver.ConfigOrError; import io.grpc.NameResolver.ConfigOrError;
import io.grpc.NoopMetricSink;
import io.grpc.ServerCall;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.Status.Code; import io.grpc.Status.Code;
import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext;
@ -68,16 +72,20 @@ import io.grpc.internal.FakeClock;
import io.grpc.internal.JsonParser; import io.grpc.internal.JsonParser;
import io.grpc.internal.PickFirstLoadBalancerProvider; import io.grpc.internal.PickFirstLoadBalancerProvider;
import io.grpc.internal.PickSubchannelArgsImpl; import io.grpc.internal.PickSubchannelArgsImpl;
import io.grpc.internal.testing.StreamRecorder;
import io.grpc.lookup.v1.RouteLookupServiceGrpc; import io.grpc.lookup.v1.RouteLookupServiceGrpc;
import io.grpc.rls.RlsLoadBalancer.CachingRlsLbClientBuilderProvider; import io.grpc.rls.RlsLoadBalancer.CachingRlsLbClientBuilderProvider;
import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter; import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
import io.grpc.rls.RlsProtoData.RouteLookupRequest; import io.grpc.rls.RlsProtoData.RouteLookupRequest;
import io.grpc.rls.RlsProtoData.RouteLookupResponse; import io.grpc.rls.RlsProtoData.RouteLookupResponse;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule; import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.TestMethodDescriptors;
import java.io.IOException; import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Deque; import java.util.Deque;
import java.util.LinkedList; import java.util.LinkedList;
@ -126,12 +134,10 @@ public class RlsLoadBalancerTest {
private final Deque<FakeSubchannel> subchannels = new LinkedList<>(); private final Deque<FakeSubchannel> subchannels = new LinkedList<>();
private final FakeThrottler fakeThrottler = new FakeThrottler(); private final FakeThrottler fakeThrottler = new FakeThrottler();
private final String channelTarget = "channelTarget"; private final String channelTarget = "channelTarget";
@Mock
private Marshaller<Object> mockMarshaller;
@Captor @Captor
private ArgumentCaptor<SubchannelPicker> pickerCaptor; private ArgumentCaptor<SubchannelPicker> pickerCaptor;
private MethodDescriptor<Object, Object> fakeSearchMethod; private MethodDescriptor<Void, Void> fakeSearchMethod;
private MethodDescriptor<Object, Object> fakeRescueMethod; private MethodDescriptor<Void, Void> fakeRescueMethod;
private RlsLoadBalancer rlsLb; private RlsLoadBalancer rlsLb;
private String defaultTarget = "defaultTarget"; private String defaultTarget = "defaultTarget";
private PickSubchannelArgs searchSubchannelArgs; private PickSubchannelArgs searchSubchannelArgs;
@ -140,17 +146,17 @@ public class RlsLoadBalancerTest {
@Before @Before
public void setUp() { public void setUp() {
fakeSearchMethod = fakeSearchMethod =
MethodDescriptor.newBuilder() MethodDescriptor.<Void, Void>newBuilder()
.setFullMethodName("com.google/Search") .setFullMethodName("com.google/Search")
.setRequestMarshaller(mockMarshaller) .setRequestMarshaller(TestMethodDescriptors.voidMarshaller())
.setResponseMarshaller(mockMarshaller) .setResponseMarshaller(TestMethodDescriptors.voidMarshaller())
.setType(MethodType.CLIENT_STREAMING) .setType(MethodType.CLIENT_STREAMING)
.build(); .build();
fakeRescueMethod = fakeRescueMethod =
MethodDescriptor.newBuilder() MethodDescriptor.<Void, Void>newBuilder()
.setFullMethodName("com.google/Rescue") .setFullMethodName("com.google/Rescue")
.setRequestMarshaller(mockMarshaller) .setRequestMarshaller(TestMethodDescriptors.voidMarshaller())
.setResponseMarshaller(mockMarshaller) .setResponseMarshaller(TestMethodDescriptors.voidMarshaller())
.setType(MethodType.UNARY) .setType(MethodType.UNARY)
.build(); .build();
fakeRlsServerImpl.setLookupTable( fakeRlsServerImpl.setLookupTable(
@ -287,6 +293,44 @@ public class RlsLoadBalancerTest {
verifyNoMoreInteractions(mockMetricRecorder); verifyNoMoreInteractions(mockMetricRecorder);
} }
@Test
public void metricsWithRealChannel() throws Exception {
grpcCleanupRule.register(
InProcessServerBuilder.forName("fake-bigtable.googleapis.com")
.addService(ServerServiceDefinition.builder("com.google")
.addMethod(fakeSearchMethod, (call, headers) -> {
call.sendHeaders(new Metadata());
call.sendMessage(null);
call.close(Status.OK, new Metadata());
return new ServerCall.Listener<Void>() {};
})
.build())
.directExecutor()
.build()
.start());
MetricSink metrics = mock(MetricSink.class, delegatesTo(new NoopMetricSink()));
ManagedChannel channel = grpcCleanupRule.register(
InProcessChannelBuilder.forName("fake-bigtable.googleapis.com")
.defaultServiceConfig(parseJson(getServiceConfigJsonStr()))
.addMetricSink(metrics)
.directExecutor()
.build());
StreamRecorder<Void> recorder = StreamRecorder.create();
StreamObserver<Void> requestObserver = ClientCalls.asyncClientStreamingCall(
channel.newCall(fakeSearchMethod, CallOptions.DEFAULT), recorder);
requestObserver.onCompleted();
assertThat(recorder.awaitCompletion(10, TimeUnit.SECONDS)).isTrue();
assertThat(recorder.getError()).isNull();
verify(metrics).addLongCounter(
eqMetricInstrumentName("grpc.lb.rls.default_target_picks"),
eq(1L),
eq(Arrays.asList("directaddress:///fake-bigtable.googleapis.com", "localhost:8972",
"defaultTarget", "complete")),
eq(Arrays.asList()));
}
@Test @Test
public void lb_working_withoutDefaultTarget_noRlsResponse() throws Exception { public void lb_working_withoutDefaultTarget_noRlsResponse() throws Exception {
defaultTarget = ""; defaultTarget = "";
@ -511,7 +555,7 @@ public class RlsLoadBalancerTest {
private void deliverResolvedAddresses() throws Exception { private void deliverResolvedAddresses() throws Exception {
ConfigOrError parsedConfigOrError = ConfigOrError parsedConfigOrError =
provider.parseLoadBalancingPolicyConfig(getServiceConfig()); provider.parseLoadBalancingPolicyConfig(parseJson(getLbConfigJsonStr()));
assertThat(parsedConfigOrError.getConfig()).isNotNull(); assertThat(parsedConfigOrError.getConfig()).isNotNull();
rlsLb.acceptResolvedAddresses(ResolvedAddresses.newBuilder() rlsLb.acceptResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.of(new EquivalentAddressGroup(mock(SocketAddress.class)))) .setAddresses(ImmutableList.of(new EquivalentAddressGroup(mock(SocketAddress.class))))
@ -521,13 +565,24 @@ public class RlsLoadBalancerTest {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private Map<String, Object> getServiceConfig() throws IOException { private Map<String, Object> parseJson(String json) throws IOException {
String serviceConfig = "{" return (Map<String, Object>) JsonParser.parse(json);
}
private String getServiceConfigJsonStr() {
return "{"
+ " \"loadBalancingConfig\": [{"
+ " \"rls_experimental\": " + getLbConfigJsonStr()
+ " }]"
+ "}";
}
private String getLbConfigJsonStr() {
return "{"
+ " \"routeLookupConfig\": " + getRlsConfigJsonStr() + ", " + " \"routeLookupConfig\": " + getRlsConfigJsonStr() + ", "
+ " \"childPolicy\": [{\"pick_first\": {}}]," + " \"childPolicy\": [{\"pick_first\": {}}],"
+ " \"childPolicyConfigTargetFieldName\": \"serviceName\"" + " \"childPolicyConfigTargetFieldName\": \"serviceName\""
+ "}"; + "}";
return (Map<String, Object>) JsonParser.parse(serviceConfig);
} }
private String getRlsConfigJsonStr() { private String getRlsConfigJsonStr() {
@ -571,12 +626,7 @@ public class RlsLoadBalancerTest {
String dataPlaneTargetLabel, String pickResult) { String dataPlaneTargetLabel, String pickResult) {
// TODO: support the "grpc.target" label once available. // TODO: support the "grpc.target" label once available.
verify(mockMetricRecorder, times(times)).addLongCounter( verify(mockMetricRecorder, times(times)).addLongCounter(
argThat(new ArgumentMatcher<LongCounterMetricInstrument>() { eqMetricInstrumentName(name), eq(value),
@Override
public boolean matches(LongCounterMetricInstrument longCounterInstrument) {
return longCounterInstrument.getName().equals(name);
}
}), eq(value),
eq(Lists.newArrayList(channelTarget, "localhost:8972", dataPlaneTargetLabel, pickResult)), eq(Lists.newArrayList(channelTarget, "localhost:8972", dataPlaneTargetLabel, pickResult)),
eq(Lists.newArrayList())); eq(Lists.newArrayList()));
} }
@ -585,16 +635,21 @@ public class RlsLoadBalancerTest {
private void verifyFailedPicksCounterAdd(int times, long value) { private void verifyFailedPicksCounterAdd(int times, long value) {
// TODO: support the "grpc.target" label once available. // TODO: support the "grpc.target" label once available.
verify(mockMetricRecorder, times(times)).addLongCounter( verify(mockMetricRecorder, times(times)).addLongCounter(
argThat(new ArgumentMatcher<LongCounterMetricInstrument>() { eqMetricInstrumentName("grpc.lb.rls.failed_picks"), eq(value),
@Override
public boolean matches(LongCounterMetricInstrument longCounterInstrument) {
return longCounterInstrument.getName().equals("grpc.lb.rls.failed_picks");
}
}), eq(value),
eq(Lists.newArrayList(channelTarget, "localhost:8972")), eq(Lists.newArrayList(channelTarget, "localhost:8972")),
eq(Lists.newArrayList())); eq(Lists.newArrayList()));
} }
@SuppressWarnings("TypeParameterUnusedInFormals")
private <T extends MetricInstrument> T eqMetricInstrumentName(String name) {
return argThat(new ArgumentMatcher<T>() {
@Override
public boolean matches(T instrument) {
return instrument.getName().equals(name);
}
});
}
private PickSubchannelArgs newPickSubchannelArgs(MethodDescriptor<?, ?> method) { private PickSubchannelArgs newPickSubchannelArgs(MethodDescriptor<?, ?> method) {
return new PickSubchannelArgsImpl( return new PickSubchannelArgsImpl(
method, new Metadata(), CallOptions.DEFAULT, new PickDetailsConsumer() {}); method, new Metadata(), CallOptions.DEFAULT, new PickDetailsConsumer() {});