mirror of https://github.com/grpc/grpc-java.git
Add MetricSink implementation for gRPC OpenTelemetry
This adds the following components that are required for gRPC A79 non-per-call metrics architecture. - MetricSink implementation for gRPC OpenTelemetry - Configurator for plumbing per call metrics ClientInterceptor and ServerStreamTracer.Factory via unified OpenTelemetryModule.
This commit is contained in:
parent
79bb5e540d
commit
d6f1a9d569
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Copyright 2024 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc;
|
||||
|
||||
/**
|
||||
* Provides hooks for modifying gRPC channels and servers during their construction.
|
||||
*/
|
||||
@Internal
|
||||
public interface Configurator {
|
||||
/**
|
||||
* Allows implementations to modify the channel builder.
|
||||
*
|
||||
* @param channelBuilder the channel builder being constructed
|
||||
*/
|
||||
default void configureChannelBuilder(ManagedChannelBuilder<?> channelBuilder) {}
|
||||
|
||||
/**
|
||||
* Allows implementations to modify the server builder.
|
||||
*
|
||||
* @param serverBuilder the server builder being constructed
|
||||
*/
|
||||
default void configureServerBuilder(ServerBuilder<?> serverBuilder) {}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Copyright 2024 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A registry for {@link Configurator} instances.
|
||||
*
|
||||
* <p>This class is responsible for maintaining a list of configurators and providing access to
|
||||
* them. The default registry can be obtained using {@link #getDefaultRegistry()}.
|
||||
*/
|
||||
@Internal
|
||||
public final class ConfiguratorRegistry {
|
||||
private static ConfiguratorRegistry instance;
|
||||
private static boolean isConfiguratorsSet;
|
||||
private List<Configurator> configurators = Collections.emptyList();
|
||||
|
||||
ConfiguratorRegistry() {}
|
||||
|
||||
/**
|
||||
* Returns the default global instance of the configurator registry.
|
||||
*/
|
||||
public static synchronized ConfiguratorRegistry getDefaultRegistry() {
|
||||
if (instance == null) {
|
||||
instance = new ConfiguratorRegistry();
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the configurators in this registry. This method can only be called once.
|
||||
*
|
||||
* @param configurators the configurators to set
|
||||
* @throws IllegalStateException if this method is called more than once
|
||||
*/
|
||||
public synchronized void setConfigurators(List<Configurator> configurators) {
|
||||
if (isConfiguratorsSet) {
|
||||
throw new IllegalStateException("Configurators are already set");
|
||||
}
|
||||
configurators = Collections.unmodifiableList(new ArrayList<>(configurators));
|
||||
isConfiguratorsSet = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a list of the configurators in this registry.
|
||||
*/
|
||||
public synchronized List<Configurator> getConfigurators() {
|
||||
return configurators;
|
||||
}
|
||||
}
|
|
@ -23,7 +23,7 @@ import java.util.List;
|
|||
*/
|
||||
@Internal
|
||||
public final class DoubleCounterMetricInstrument extends PartialMetricInstrument {
|
||||
DoubleCounterMetricInstrument(int index, String name, String description, String unit,
|
||||
public DoubleCounterMetricInstrument(int index, String name, String description, String unit,
|
||||
List<String> requiredLabelKeys, List<String> optionalLabelKeys, boolean enableByDefault) {
|
||||
super(index, name, description, unit, requiredLabelKeys, optionalLabelKeys, enableByDefault);
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ import java.util.List;
|
|||
public final class DoubleHistogramMetricInstrument extends PartialMetricInstrument {
|
||||
private final List<Double> bucketBoundaries;
|
||||
|
||||
DoubleHistogramMetricInstrument(int index, String name, String description, String unit,
|
||||
public DoubleHistogramMetricInstrument(int index, String name, String description, String unit,
|
||||
List<Double> bucketBoundaries, List<String> requiredLabelKeys, List<String> optionalLabelKeys,
|
||||
boolean enableByDefault) {
|
||||
super(index, name, description, unit, requiredLabelKeys, optionalLabelKeys, enableByDefault);
|
||||
|
|
|
@ -23,7 +23,7 @@ import java.util.List;
|
|||
*/
|
||||
@Internal
|
||||
public final class LongCounterMetricInstrument extends PartialMetricInstrument {
|
||||
LongCounterMetricInstrument(int index, String name, String description, String unit,
|
||||
public LongCounterMetricInstrument(int index, String name, String description, String unit,
|
||||
List<String> requiredLabelKeys, List<String> optionalLabelKeys, boolean enableByDefault) {
|
||||
super(index, name, description, unit, requiredLabelKeys, optionalLabelKeys, enableByDefault);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ import java.util.List;
|
|||
*/
|
||||
@Internal
|
||||
public final class LongGaugeMetricInstrument extends PartialMetricInstrument {
|
||||
LongGaugeMetricInstrument(int index, String name, String description, String unit,
|
||||
public LongGaugeMetricInstrument(int index, String name, String description, String unit,
|
||||
List<String> requiredLabelKeys, List<String> optionalLabelKeys, boolean enableByDefault) {
|
||||
super(index, name, description, unit, requiredLabelKeys, optionalLabelKeys, enableByDefault);
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ import java.util.List;
|
|||
public final class LongHistogramMetricInstrument extends PartialMetricInstrument {
|
||||
private final List<Long> bucketBoundaries;
|
||||
|
||||
LongHistogramMetricInstrument(int index, String name, String description, String unit,
|
||||
public LongHistogramMetricInstrument(int index, String name, String description, String unit,
|
||||
List<Long> bucketBoundaries, List<String> requiredLabelKeys, List<String> optionalLabelKeys,
|
||||
boolean enableByDefault) {
|
||||
super(index, name, description, unit, requiredLabelKeys, optionalLabelKeys, enableByDefault);
|
||||
|
|
|
@ -95,6 +95,7 @@ public final class MetricInstrumentRegistry {
|
|||
if (index + 1 == metricInstruments.length) {
|
||||
resizeMetricInstruments();
|
||||
}
|
||||
// TODO(dnvindhya): add limit for number of optional labels allowed
|
||||
DoubleCounterMetricInstrument instrument = new DoubleCounterMetricInstrument(
|
||||
index, name, description, unit, requiredLabelKeys, optionalLabelKeys,
|
||||
enableByDefault);
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package io.grpc;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
|
@ -26,50 +27,49 @@ import java.util.Set;
|
|||
public interface MetricSink {
|
||||
|
||||
/**
|
||||
* Returns a set of names for the metrics that are currently enabled for this `MetricSink`.
|
||||
* Returns a set of names for the metrics that are currently enabled or disabled.
|
||||
*
|
||||
* @return A set of enabled metric names.
|
||||
*/
|
||||
Set<String> getEnabledMetrics();
|
||||
Map<String, Boolean> getEnabledMetrics();
|
||||
|
||||
/**
|
||||
* Returns a list of label names that are considered optional for metrics collected by this
|
||||
* `MetricSink`.
|
||||
* Returns a set of optional label names for metrics that the sink actually wants.
|
||||
*
|
||||
* @return A list of optional label names.
|
||||
* @return A set of optional label names.
|
||||
*/
|
||||
List<String> getOptionalLabels();
|
||||
Set<String> getOptionalLabels();
|
||||
|
||||
/**
|
||||
* Returns a list of metric measures used to record metric values. These measures are created
|
||||
* Returns size of metric measures used to record metric values. These measures are created
|
||||
* based on registered metrics (via MetricInstrumentRegistry) and are ordered according to their
|
||||
* registration sequence.
|
||||
*
|
||||
* @return A list of metric measures.
|
||||
* @return Size of metric measures.
|
||||
*/
|
||||
List<Object> getMetricsMeasures();
|
||||
int getMeasuresSize();
|
||||
|
||||
/**
|
||||
* Records a value for a double-precision counter associated with specified metric instrument.
|
||||
* Adds a value for a double-precision counter associated with specified metric instrument.
|
||||
*
|
||||
* @param metricInstrument The counter metric instrument identifies metric measure to record.
|
||||
* @param metricInstrument The counter metric instrument identifies metric measure to add.
|
||||
* @param value The value to record.
|
||||
* @param requiredLabelValues A list of required label values for the metric.
|
||||
* @param optionalLabelValues A list of additional, optional label values for the metric.
|
||||
*/
|
||||
default void recordDoubleCounter(DoubleCounterMetricInstrument metricInstrument, double value,
|
||||
default void addDoubleCounter(DoubleCounterMetricInstrument metricInstrument, double value,
|
||||
List<String> requiredLabelValues, List<String> optionalLabelValues) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Records a value for a long valued counter metric associated with specified metric instrument.
|
||||
* Adds a value for a long valued counter metric associated with specified metric instrument.
|
||||
*
|
||||
* @param metricInstrument The counter metric instrument identifies metric measure to record.
|
||||
* @param metricInstrument The counter metric instrument identifies metric measure to add.
|
||||
* @param value The value to record.
|
||||
* @param requiredLabelValues A list of required label values for the metric.
|
||||
* @param optionalLabelValues A list of additional, optional label values for the metric.
|
||||
*/
|
||||
default void recordLongCounter(LongCounterMetricInstrument metricInstrument, long value,
|
||||
default void addLongCounter(LongCounterMetricInstrument metricInstrument, long value,
|
||||
List<String> requiredLabelValues, List<String> optionalLabelValues) {
|
||||
}
|
||||
|
||||
|
@ -99,5 +99,5 @@ public interface MetricSink {
|
|||
List<String> requiredLabelValues, List<String> optionalLabelValues) {
|
||||
}
|
||||
|
||||
default void updateMeasures(List<MetricInstrument> instruments) {}
|
||||
void updateMeasures(List<MetricInstrument> instruments);
|
||||
}
|
||||
|
|
|
@ -72,6 +72,8 @@ import io.grpc.ManagedChannel;
|
|||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.MetricInstrumentRegistry;
|
||||
import io.grpc.MetricRecorder;
|
||||
import io.grpc.NameResolver;
|
||||
import io.grpc.NameResolver.ConfigOrError;
|
||||
import io.grpc.NameResolver.ResolutionResult;
|
||||
|
@ -588,6 +590,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
|
||||
|
||||
private final Rescheduler idleTimer;
|
||||
private final MetricRecorder metricRecorder;
|
||||
|
||||
ManagedChannelImpl(
|
||||
ManagedChannelImplBuilder builder,
|
||||
|
@ -715,6 +718,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
}
|
||||
serviceConfigUpdated = true;
|
||||
}
|
||||
this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
|
||||
MetricInstrumentRegistry.getDefaultRegistry());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -1743,6 +1748,11 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
return nameResolverRegistry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricRecorder getMetricRecorder() {
|
||||
return metricRecorder;
|
||||
}
|
||||
|
||||
/**
|
||||
* A placeholder for channel creds if user did not specify channel creds for the channel.
|
||||
*/
|
||||
|
|
|
@ -30,12 +30,15 @@ import io.grpc.ChannelCredentials;
|
|||
import io.grpc.ClientInterceptor;
|
||||
import io.grpc.ClientTransportFilter;
|
||||
import io.grpc.CompressorRegistry;
|
||||
import io.grpc.Configurator;
|
||||
import io.grpc.ConfiguratorRegistry;
|
||||
import io.grpc.DecompressorRegistry;
|
||||
import io.grpc.EquivalentAddressGroup;
|
||||
import io.grpc.InternalChannelz;
|
||||
import io.grpc.InternalGlobalInterceptors;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.grpc.MetricSink;
|
||||
import io.grpc.NameResolver;
|
||||
import io.grpc.NameResolverProvider;
|
||||
import io.grpc.NameResolverRegistry;
|
||||
|
@ -192,6 +195,7 @@ public final class ManagedChannelImplBuilder
|
|||
private boolean recordRealTimeMetrics = false;
|
||||
private boolean recordRetryMetrics = true;
|
||||
private boolean tracingEnabled = true;
|
||||
List<MetricSink> metricSinks = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* An interface for Transport implementors to provide the {@link ClientTransportFactory}
|
||||
|
@ -340,6 +344,10 @@ public final class ManagedChannelImplBuilder
|
|||
} else {
|
||||
this.channelBuilderDefaultPortProvider = new ManagedChannelDefaultPortProvider();
|
||||
}
|
||||
// TODO(dnvindhya): Move configurator to all the individual builders
|
||||
for (Configurator configurator : ConfiguratorRegistry.getDefaultRegistry().getConfigurators()) {
|
||||
configurator.configureChannelBuilder(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -661,6 +669,12 @@ public final class ManagedChannelImplBuilder
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ManagedChannelImplBuilder addMetricSink(MetricSink metricSink) {
|
||||
metricSinks.add(checkNotNull(metricSink, "metric sink"));
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ManagedChannel build() {
|
||||
return new ManagedChannelOrphanWrapper(new ManagedChannelImpl(
|
||||
|
@ -680,6 +694,7 @@ public final class ManagedChannelImplBuilder
|
|||
List<ClientInterceptor> getEffectiveInterceptors() {
|
||||
List<ClientInterceptor> effectiveInterceptors = new ArrayList<>(this.interceptors);
|
||||
boolean isGlobalInterceptorsSet = false;
|
||||
// TODO(dnvindhya) : Convert to Configurator
|
||||
List<ClientInterceptor> globalClientInterceptors =
|
||||
InternalGlobalInterceptors.getClientInterceptors();
|
||||
if (globalClientInterceptors != null) {
|
||||
|
|
|
@ -68,14 +68,14 @@ final class MetricRecorderImpl implements MetricRecorder {
|
|||
+ metricInstrument.getOptionalLabelKeys().size());
|
||||
for (MetricSink sink : metricSinks) {
|
||||
// TODO(dnvindhya): Move updating measures logic from sink to here
|
||||
List<Object> measures = sink.getMetricsMeasures();
|
||||
if (measures.size() <= metricInstrument.getIndex()) {
|
||||
int measuresSize = sink.getMeasuresSize();
|
||||
if (measuresSize <= metricInstrument.getIndex()) {
|
||||
// Measures may need updating in two cases:
|
||||
// 1. When the sink is initially created with an empty list of measures.
|
||||
// 2. When new metric instruments are registered, requiring the sink to accommodate them.
|
||||
sink.updateMeasures(registry.getMetricInstruments());
|
||||
}
|
||||
sink.recordDoubleCounter(metricInstrument, value, requiredLabelValues, optionalLabelValues);
|
||||
sink.addDoubleCounter(metricInstrument, value, requiredLabelValues, optionalLabelValues);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -99,14 +99,14 @@ final class MetricRecorderImpl implements MetricRecorder {
|
|||
"Incorrect number of optional labels provided. Expected: "
|
||||
+ metricInstrument.getOptionalLabelKeys().size());
|
||||
for (MetricSink sink : metricSinks) {
|
||||
List<Object> measures = sink.getMetricsMeasures();
|
||||
if (measures.size() <= metricInstrument.getIndex()) {
|
||||
int measuresSize = sink.getMeasuresSize();
|
||||
if (measuresSize <= metricInstrument.getIndex()) {
|
||||
// Measures may need updating in two cases:
|
||||
// 1. When the sink is initially created with an empty list of measures.
|
||||
// 2. When new metric instruments are registered, requiring the sink to accommodate them.
|
||||
sink.updateMeasures(registry.getMetricInstruments());
|
||||
}
|
||||
sink.recordLongCounter(metricInstrument, value, requiredLabelValues, optionalLabelValues);
|
||||
sink.addLongCounter(metricInstrument, value, requiredLabelValues, optionalLabelValues);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -130,8 +130,8 @@ final class MetricRecorderImpl implements MetricRecorder {
|
|||
"Incorrect number of optional labels provided. Expected: "
|
||||
+ metricInstrument.getOptionalLabelKeys().size());
|
||||
for (MetricSink sink : metricSinks) {
|
||||
List<Object> measures = sink.getMetricsMeasures();
|
||||
if (measures.size() <= metricInstrument.getIndex()) {
|
||||
int measuresSize = sink.getMeasuresSize();
|
||||
if (measuresSize <= metricInstrument.getIndex()) {
|
||||
// Measures may need updating in two cases:
|
||||
// 1. When the sink is initially created with an empty list of measures.
|
||||
// 2. When new metric instruments are registered, requiring the sink to accommodate them.
|
||||
|
@ -161,8 +161,8 @@ final class MetricRecorderImpl implements MetricRecorder {
|
|||
"Incorrect number of optional labels provided. Expected: "
|
||||
+ metricInstrument.getOptionalLabelKeys().size());
|
||||
for (MetricSink sink : metricSinks) {
|
||||
List<Object> measures = sink.getMetricsMeasures();
|
||||
if (measures.size() <= metricInstrument.getIndex()) {
|
||||
int measuresSize = sink.getMeasuresSize();
|
||||
if (measuresSize <= metricInstrument.getIndex()) {
|
||||
// Measures may need updating in two cases:
|
||||
// 1. When the sink is initially created with an empty list of measures.
|
||||
// 2. When new metric instruments are registered, requiring the sink to accommodate them.
|
||||
|
|
|
@ -25,6 +25,8 @@ import com.google.errorprone.annotations.DoNotCall;
|
|||
import io.grpc.BinaryLog;
|
||||
import io.grpc.BindableService;
|
||||
import io.grpc.CompressorRegistry;
|
||||
import io.grpc.Configurator;
|
||||
import io.grpc.ConfiguratorRegistry;
|
||||
import io.grpc.Context;
|
||||
import io.grpc.Deadline;
|
||||
import io.grpc.DecompressorRegistry;
|
||||
|
@ -113,6 +115,10 @@ public final class ServerImplBuilder extends ServerBuilder<ServerImplBuilder> {
|
|||
public ServerImplBuilder(ClientTransportServersBuilder clientTransportServersBuilder) {
|
||||
this.clientTransportServersBuilder = checkNotNull(clientTransportServersBuilder,
|
||||
"clientTransportServersBuilder");
|
||||
// TODO(dnvindhya): Move configurator to all the individual builders
|
||||
for (Configurator configurator : ConfiguratorRegistry.getDefaultRegistry().getConfigurators()) {
|
||||
configurator.configureServerBuilder(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -39,6 +39,7 @@ import io.grpc.DecompressorRegistry;
|
|||
import io.grpc.InternalGlobalInterceptors;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.MetricSink;
|
||||
import io.grpc.NameResolver;
|
||||
import io.grpc.NameResolverRegistry;
|
||||
import io.grpc.StaticTestingClassLoader;
|
||||
|
@ -733,4 +734,12 @@ public class ManagedChannelImplBuilderTest {
|
|||
builder.disableServiceConfigLookUp();
|
||||
assertThat(builder.lookUpServiceConfig).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void metricSinks() {
|
||||
MetricSink mocksink = mock(MetricSink.class);
|
||||
builder.addMetricSink(mocksink);
|
||||
|
||||
assertThat(builder.metricSinks).contains(mocksink);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -97,10 +97,13 @@ import io.grpc.LoadBalancer.SubchannelPicker;
|
|||
import io.grpc.LoadBalancer.SubchannelStateListener;
|
||||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.LoadBalancerRegistry;
|
||||
import io.grpc.LongCounterMetricInstrument;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.MethodDescriptor.MethodType;
|
||||
import io.grpc.MetricInstrumentRegistry;
|
||||
import io.grpc.MetricSink;
|
||||
import io.grpc.NameResolver;
|
||||
import io.grpc.NameResolver.ConfigOrError;
|
||||
import io.grpc.NameResolver.ResolutionResult;
|
||||
|
@ -223,6 +226,9 @@ public class ManagedChannelImplTest {
|
|||
|
||||
private final InternalChannelz channelz = new InternalChannelz();
|
||||
|
||||
private final MetricInstrumentRegistry metricInstrumentRegistry =
|
||||
MetricInstrumentRegistry.getDefaultRegistry();
|
||||
|
||||
@Rule public final MockitoRule mocks = MockitoJUnit.rule();
|
||||
|
||||
private ManagedChannelImpl channel;
|
||||
|
@ -655,6 +661,24 @@ public class ManagedChannelImplTest {
|
|||
verify(tracer).addOptionalLabel("routed", "perfectly");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void metricRecorder_recordsToMetricSink() {
|
||||
MetricSink mockSink = mock(MetricSink.class);
|
||||
channelBuilder.addMetricSink(mockSink);
|
||||
createChannel();
|
||||
|
||||
LongCounterMetricInstrument counter = metricInstrumentRegistry.registerLongCounter(
|
||||
"recorder_duration", "Time taken by metric recorder", "s",
|
||||
ImmutableList.of("grpc.method"), Collections.emptyList(), false);
|
||||
List<String> requiredLabelValues = ImmutableList.of("testMethod");
|
||||
List<String> optionalLabelValues = Collections.emptyList();
|
||||
|
||||
helper.getMetricRecorder()
|
||||
.addLongCounter(counter, 32, requiredLabelValues, optionalLabelValues);
|
||||
verify(mockSink).addLongCounter(eq(counter), eq(32L), eq(requiredLabelValues),
|
||||
eq(optionalLabelValues));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shutdownWithNoTransportsEverCreated() {
|
||||
channelBuilder.nameResolverFactory(
|
||||
|
|
|
@ -33,7 +33,6 @@ import io.grpc.MetricInstrumentRegistry;
|
|||
import io.grpc.MetricInstrumentRegistryAccessor;
|
||||
import io.grpc.MetricRecorder;
|
||||
import io.grpc.MetricSink;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -81,18 +80,17 @@ public class MetricRecorderImplTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void recordCounter() {
|
||||
when(mockSink.getMetricsMeasures()).thenReturn(
|
||||
Arrays.asList(new Object(), new Object(), new Object(), new Object()));
|
||||
public void addCounter() {
|
||||
when(mockSink.getMeasuresSize()).thenReturn(4);
|
||||
|
||||
recorder.addDoubleCounter(doubleCounterInstrument, 1.0, REQUIRED_LABEL_VALUES,
|
||||
OPTIONAL_LABEL_VALUES);
|
||||
verify(mockSink, times(2)).recordDoubleCounter(eq(doubleCounterInstrument), eq(1D),
|
||||
verify(mockSink, times(2)).addDoubleCounter(eq(doubleCounterInstrument), eq(1D),
|
||||
eq(REQUIRED_LABEL_VALUES), eq(OPTIONAL_LABEL_VALUES));
|
||||
|
||||
recorder.addLongCounter(longCounterInstrument, 1, REQUIRED_LABEL_VALUES,
|
||||
OPTIONAL_LABEL_VALUES);
|
||||
verify(mockSink, times(2)).recordLongCounter(eq(longCounterInstrument), eq(1L),
|
||||
verify(mockSink, times(2)).addLongCounter(eq(longCounterInstrument), eq(1L),
|
||||
eq(REQUIRED_LABEL_VALUES), eq(OPTIONAL_LABEL_VALUES));
|
||||
|
||||
verify(mockSink, never()).updateMeasures(registry.getMetricInstruments());
|
||||
|
@ -100,8 +98,7 @@ public class MetricRecorderImplTest {
|
|||
|
||||
@Test
|
||||
public void recordHistogram() {
|
||||
when(mockSink.getMetricsMeasures()).thenReturn(
|
||||
Arrays.asList(new Object(), new Object(), new Object(), new Object()));
|
||||
when(mockSink.getMeasuresSize()).thenReturn(4);
|
||||
|
||||
recorder.recordDoubleHistogram(doubleHistogramInstrument, 99.0, REQUIRED_LABEL_VALUES,
|
||||
OPTIONAL_LABEL_VALUES);
|
||||
|
@ -119,20 +116,20 @@ public class MetricRecorderImplTest {
|
|||
@Test
|
||||
public void newRegisteredMetricUpdateMeasures() {
|
||||
// Sink is initialized with zero measures, should trigger updateMeasures() on sinks
|
||||
when(mockSink.getMetricsMeasures()).thenReturn(new ArrayList<>());
|
||||
when(mockSink.getMeasuresSize()).thenReturn(0);
|
||||
|
||||
// Double Counter
|
||||
recorder.addDoubleCounter(doubleCounterInstrument, 1.0, REQUIRED_LABEL_VALUES,
|
||||
OPTIONAL_LABEL_VALUES);
|
||||
verify(mockSink, times(2)).updateMeasures(anyList());
|
||||
verify(mockSink, times(2)).recordDoubleCounter(eq(doubleCounterInstrument), eq(1D),
|
||||
verify(mockSink, times(2)).addDoubleCounter(eq(doubleCounterInstrument), eq(1D),
|
||||
eq(REQUIRED_LABEL_VALUES), eq(OPTIONAL_LABEL_VALUES));
|
||||
|
||||
// Long Counter
|
||||
recorder.addLongCounter(longCounterInstrument, 1, REQUIRED_LABEL_VALUES,
|
||||
OPTIONAL_LABEL_VALUES);
|
||||
verify(mockSink, times(4)).updateMeasures(anyList());
|
||||
verify(mockSink, times(2)).recordLongCounter(eq(longCounterInstrument), eq(1L),
|
||||
verify(mockSink, times(2)).addLongCounter(eq(longCounterInstrument), eq(1L),
|
||||
eq(REQUIRED_LABEL_VALUES), eq(OPTIONAL_LABEL_VALUES));
|
||||
|
||||
// Double Histogram
|
||||
|
@ -151,18 +148,16 @@ public class MetricRecorderImplTest {
|
|||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void recordDoubleCounterMismatchedRequiredLabelValues() {
|
||||
when(mockSink.getMetricsMeasures()).thenReturn(
|
||||
Arrays.asList(new Object(), new Object(), new Object(), new Object()));
|
||||
public void addDoubleCounterMismatchedRequiredLabelValues() {
|
||||
when(mockSink.getMeasuresSize()).thenReturn(4);
|
||||
|
||||
recorder.addDoubleCounter(doubleCounterInstrument, 1.0, ImmutableList.of(),
|
||||
OPTIONAL_LABEL_VALUES);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void recordLongCounterMismatchedRequiredLabelValues() {
|
||||
when(mockSink.getMetricsMeasures()).thenReturn(
|
||||
Arrays.asList(new Object(), new Object(), new Object(), new Object()));
|
||||
public void addLongCounterMismatchedRequiredLabelValues() {
|
||||
when(mockSink.getMeasuresSize()).thenReturn(4);
|
||||
|
||||
recorder.addLongCounter(longCounterInstrument, 1, ImmutableList.of(),
|
||||
OPTIONAL_LABEL_VALUES);
|
||||
|
@ -170,8 +165,7 @@ public class MetricRecorderImplTest {
|
|||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void recordDoubleHistogramMismatchedRequiredLabelValues() {
|
||||
when(mockSink.getMetricsMeasures()).thenReturn(
|
||||
Arrays.asList(new Object(), new Object(), new Object(), new Object()));
|
||||
when(mockSink.getMeasuresSize()).thenReturn(4);
|
||||
|
||||
recorder.recordDoubleHistogram(doubleHistogramInstrument, 99.0, ImmutableList.of(),
|
||||
OPTIONAL_LABEL_VALUES);
|
||||
|
@ -179,26 +173,23 @@ public class MetricRecorderImplTest {
|
|||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void recordLongHistogramMismatchedRequiredLabelValues() {
|
||||
when(mockSink.getMetricsMeasures()).thenReturn(
|
||||
Arrays.asList(new Object(), new Object(), new Object(), new Object()));
|
||||
when(mockSink.getMeasuresSize()).thenReturn(4);
|
||||
|
||||
recorder.recordLongHistogram(longHistogramInstrument, 99, ImmutableList.of(),
|
||||
OPTIONAL_LABEL_VALUES);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void recordDoubleCounterMismatchedOptionalLabelValues() {
|
||||
when(mockSink.getMetricsMeasures()).thenReturn(
|
||||
Arrays.asList(new Object(), new Object(), new Object(), new Object()));
|
||||
public void addDoubleCounterMismatchedOptionalLabelValues() {
|
||||
when(mockSink.getMeasuresSize()).thenReturn(4);
|
||||
|
||||
recorder.addDoubleCounter(doubleCounterInstrument, 1.0, REQUIRED_LABEL_VALUES,
|
||||
ImmutableList.of());
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void recordLongCounterMismatchedOptionalLabelValues() {
|
||||
when(mockSink.getMetricsMeasures()).thenReturn(
|
||||
Arrays.asList(new Object(), new Object(), new Object(), new Object()));
|
||||
public void addLongCounterMismatchedOptionalLabelValues() {
|
||||
when(mockSink.getMeasuresSize()).thenReturn(4);
|
||||
|
||||
recorder.addLongCounter(longCounterInstrument, 1, REQUIRED_LABEL_VALUES,
|
||||
ImmutableList.of());
|
||||
|
@ -206,8 +197,7 @@ public class MetricRecorderImplTest {
|
|||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void recordDoubleHistogramMismatchedOptionalLabelValues() {
|
||||
when(mockSink.getMetricsMeasures()).thenReturn(
|
||||
Arrays.asList(new Object(), new Object(), new Object(), new Object()));
|
||||
when(mockSink.getMeasuresSize()).thenReturn(4);
|
||||
|
||||
recorder.recordDoubleHistogram(doubleHistogramInstrument, 99.0, REQUIRED_LABEL_VALUES,
|
||||
ImmutableList.of());
|
||||
|
@ -215,8 +205,7 @@ public class MetricRecorderImplTest {
|
|||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void recordLongHistogramMismatchedOptionalLabelValues() {
|
||||
when(mockSink.getMetricsMeasures()).thenReturn(
|
||||
Arrays.asList(new Object(), new Object(), new Object(), new Object()));
|
||||
when(mockSink.getMeasuresSize()).thenReturn(4);
|
||||
|
||||
recorder.recordLongHistogram(longHistogramInstrument, 99, REQUIRED_LABEL_VALUES,
|
||||
ImmutableList.of());
|
||||
|
|
|
@ -0,0 +1,263 @@
|
|||
/*
|
||||
* Copyright 2024 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.opentelemetry;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import io.grpc.DoubleCounterMetricInstrument;
|
||||
import io.grpc.DoubleHistogramMetricInstrument;
|
||||
import io.grpc.LongCounterMetricInstrument;
|
||||
import io.grpc.LongHistogramMetricInstrument;
|
||||
import io.grpc.MetricInstrument;
|
||||
import io.grpc.MetricSink;
|
||||
import io.opentelemetry.api.common.Attributes;
|
||||
import io.opentelemetry.api.common.AttributesBuilder;
|
||||
import io.opentelemetry.api.metrics.DoubleCounter;
|
||||
import io.opentelemetry.api.metrics.DoubleHistogram;
|
||||
import io.opentelemetry.api.metrics.LongCounter;
|
||||
import io.opentelemetry.api.metrics.LongHistogram;
|
||||
import io.opentelemetry.api.metrics.Meter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.BitSet;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
final class OpenTelemetryMetricSink implements MetricSink {
|
||||
private static final Logger logger = Logger.getLogger(OpenTelemetryMetricSink.class.getName());
|
||||
private final Object lock = new Object();
|
||||
private final Meter openTelemetryMeter;
|
||||
private final Map<String, Boolean> enableMetrics;
|
||||
private final boolean disableDefaultMetrics;
|
||||
private final Set<String> optionalLabels;
|
||||
private volatile List<MeasuresData> measures = new ArrayList<>();
|
||||
|
||||
OpenTelemetryMetricSink(Meter meter, Map<String, Boolean> enableMetrics,
|
||||
boolean disableDefaultMetrics, List<String> optionalLabels) {
|
||||
this.openTelemetryMeter = checkNotNull(meter, "meter");
|
||||
this.enableMetrics = ImmutableMap.copyOf(enableMetrics);
|
||||
this.disableDefaultMetrics = disableDefaultMetrics;
|
||||
this.optionalLabels = ImmutableSet.copyOf(optionalLabels);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Boolean> getEnabledMetrics() {
|
||||
return enableMetrics;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getOptionalLabels() {
|
||||
return optionalLabels;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMeasuresSize() {
|
||||
return measures.size();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
List<MeasuresData> getMeasures() {
|
||||
synchronized (lock) {
|
||||
return Collections.unmodifiableList(measures);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addDoubleCounter(DoubleCounterMetricInstrument metricInstrument, double value,
|
||||
List<String> requiredLabelValues, List<String> optionalLabelValues) {
|
||||
MeasuresData instrumentData = measures.get(metricInstrument.getIndex());
|
||||
if (instrumentData == null) {
|
||||
// Disabled metric
|
||||
return;
|
||||
}
|
||||
Attributes attributes = createAttributes(metricInstrument.getRequiredLabelKeys(),
|
||||
metricInstrument.getOptionalLabelKeys(), requiredLabelValues, optionalLabelValues,
|
||||
instrumentData.getOptionalLabelsBitSet());
|
||||
DoubleCounter counter = (DoubleCounter) instrumentData.getMeasure();
|
||||
counter.add(value, attributes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addLongCounter(LongCounterMetricInstrument metricInstrument, long value,
|
||||
List<String> requiredLabelValues, List<String> optionalLabelValues) {
|
||||
MeasuresData instrumentData = measures.get(metricInstrument.getIndex());
|
||||
if (instrumentData == null) {
|
||||
// Disabled metric
|
||||
return;
|
||||
}
|
||||
Attributes attributes = createAttributes(metricInstrument.getRequiredLabelKeys(),
|
||||
metricInstrument.getOptionalLabelKeys(), requiredLabelValues, optionalLabelValues,
|
||||
instrumentData.getOptionalLabelsBitSet());
|
||||
LongCounter counter = (LongCounter) instrumentData.getMeasure();
|
||||
counter.add(value, attributes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recordDoubleHistogram(DoubleHistogramMetricInstrument metricInstrument, double value,
|
||||
List<String> requiredLabelValues, List<String> optionalLabelValues) {
|
||||
MeasuresData instrumentData = measures.get(metricInstrument.getIndex());
|
||||
if (instrumentData == null) {
|
||||
// Disabled metric
|
||||
return;
|
||||
}
|
||||
Attributes attributes = createAttributes(metricInstrument.getRequiredLabelKeys(),
|
||||
metricInstrument.getOptionalLabelKeys(), requiredLabelValues, optionalLabelValues,
|
||||
instrumentData.getOptionalLabelsBitSet());
|
||||
DoubleHistogram histogram = (DoubleHistogram) instrumentData.getMeasure();
|
||||
histogram.record(value, attributes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recordLongHistogram(LongHistogramMetricInstrument metricInstrument, long value,
|
||||
List<String> requiredLabelValues, List<String> optionalLabelValues) {
|
||||
MeasuresData instrumentData = measures.get(metricInstrument.getIndex());
|
||||
if (instrumentData == null) {
|
||||
// Disabled metric
|
||||
return;
|
||||
}
|
||||
Attributes attributes = createAttributes(metricInstrument.getRequiredLabelKeys(),
|
||||
metricInstrument.getOptionalLabelKeys(), requiredLabelValues, optionalLabelValues,
|
||||
instrumentData.getOptionalLabelsBitSet());
|
||||
LongHistogram histogram = (LongHistogram) instrumentData.getMeasure();
|
||||
histogram.record(value, attributes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateMeasures(List<MetricInstrument> instruments) {
|
||||
synchronized (lock) {
|
||||
if (measures.size() >= instruments.size()) {
|
||||
// Already up-to-date
|
||||
return;
|
||||
}
|
||||
|
||||
List<MeasuresData> newMeasures = new ArrayList<>(instruments.size());
|
||||
// Reuse existing measures
|
||||
newMeasures.addAll(measures);
|
||||
|
||||
for (int i = measures.size(); i < instruments.size(); i++) {
|
||||
MetricInstrument instrument = instruments.get(i);
|
||||
// Check if the metric is disabled
|
||||
if (!shouldEnableMetric(instrument)) {
|
||||
// Adding null measure for disabled Metric
|
||||
newMeasures.add(null);
|
||||
continue;
|
||||
}
|
||||
|
||||
BitSet bitSet = new BitSet(instrument.getOptionalLabelKeys().size());
|
||||
if (optionalLabels.isEmpty()) {
|
||||
// initialize an empty list
|
||||
} else {
|
||||
List<String> labels = instrument.getOptionalLabelKeys();
|
||||
for (int j = 0; j < labels.size(); j++) {
|
||||
if (optionalLabels.contains(labels.get(j))) {
|
||||
bitSet.set(j);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int index = instrument.getIndex();
|
||||
String name = instrument.getName();
|
||||
String unit = instrument.getUnit();
|
||||
String description = instrument.getDescription();
|
||||
|
||||
Object openTelemetryMeasure;
|
||||
if (instrument instanceof DoubleCounterMetricInstrument) {
|
||||
openTelemetryMeasure = openTelemetryMeter.counterBuilder(name)
|
||||
.setUnit(unit)
|
||||
.setDescription(description)
|
||||
.ofDoubles()
|
||||
.build();
|
||||
} else if (instrument instanceof LongCounterMetricInstrument) {
|
||||
openTelemetryMeasure = openTelemetryMeter.counterBuilder(name)
|
||||
.setUnit(unit)
|
||||
.setDescription(description)
|
||||
.build();
|
||||
} else if (instrument instanceof DoubleHistogramMetricInstrument) {
|
||||
openTelemetryMeasure = openTelemetryMeter.histogramBuilder(name)
|
||||
.setUnit(unit)
|
||||
.setDescription(description)
|
||||
.build();
|
||||
} else if (instrument instanceof LongHistogramMetricInstrument) {
|
||||
openTelemetryMeasure = openTelemetryMeter.histogramBuilder(name)
|
||||
.setUnit(unit)
|
||||
.setDescription(description)
|
||||
.ofLongs()
|
||||
.build();
|
||||
} else {
|
||||
logger.log(Level.FINE, "Unsupported metric instrument type : {0}", instrument);
|
||||
openTelemetryMeasure = null;
|
||||
}
|
||||
newMeasures.add(index, new MeasuresData(bitSet, openTelemetryMeasure));
|
||||
}
|
||||
|
||||
measures = newMeasures;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean shouldEnableMetric(MetricInstrument instrument) {
|
||||
Boolean explicitlyEnabled = enableMetrics.get(instrument.getName());
|
||||
if (explicitlyEnabled != null) {
|
||||
return explicitlyEnabled;
|
||||
}
|
||||
return instrument.isEnableByDefault() && !disableDefaultMetrics;
|
||||
}
|
||||
|
||||
|
||||
private Attributes createAttributes(List<String> requiredLabelKeys,
|
||||
List<String> optionalLabelKeys,
|
||||
List<String> requiredLabelValues, List<String> optionalLabelValues, BitSet bitSet) {
|
||||
AttributesBuilder builder = Attributes.builder();
|
||||
// Required Labels
|
||||
for (int i = 0; i < requiredLabelKeys.size(); i++) {
|
||||
builder.put(requiredLabelKeys.get(i), requiredLabelValues.get(i));
|
||||
}
|
||||
// Optional labels
|
||||
for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
|
||||
if (i == Integer.MAX_VALUE) {
|
||||
break; // or (i+1) would overflow
|
||||
}
|
||||
builder.put(optionalLabelKeys.get(i), optionalLabelValues.get(i));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
||||
static final class MeasuresData {
|
||||
final BitSet optionalLabelsIndices;
|
||||
final Object measure;
|
||||
|
||||
MeasuresData(BitSet optionalLabelsIndices, Object measure) {
|
||||
this.optionalLabelsIndices = optionalLabelsIndices;
|
||||
this.measure = measure;
|
||||
}
|
||||
|
||||
public BitSet getOptionalLabelsBitSet() {
|
||||
return optionalLabelsIndices;
|
||||
}
|
||||
|
||||
public Object getMeasure() {
|
||||
return measure;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -23,6 +23,7 @@ import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ClientCall;
|
||||
|
@ -61,6 +62,17 @@ import javax.annotation.concurrent.GuardedBy;
|
|||
*/
|
||||
final class OpenTelemetryMetricsModule {
|
||||
private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
|
||||
public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
|
||||
ImmutableSet.of(
|
||||
"grpc.client.attempt.started",
|
||||
"grpc.client.attempt.duration",
|
||||
"grpc.client.attempt.sent_total_compressed_message_size",
|
||||
"grpc.client.attempt.rcvd_total_compressed_message_size",
|
||||
"grpc.client.call.duration",
|
||||
"grpc.server.call.started",
|
||||
"grpc.server.call.duration",
|
||||
"grpc.server.call.sent_total_compressed_message_size",
|
||||
"grpc.server.call.rcvd_total_compressed_message_size");
|
||||
|
||||
// Using floating point because TimeUnit.NANOSECONDS.toSeconds would discard
|
||||
// fractional seconds.
|
||||
|
@ -182,12 +194,18 @@ final class OpenTelemetryMetricsModule {
|
|||
io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName,
|
||||
STATUS_KEY, statusCode.toString());
|
||||
|
||||
module.resource.clientAttemptDurationCounter()
|
||||
.record(attemptNanos * SECONDS_PER_NANO, attribute);
|
||||
module.resource.clientTotalSentCompressedMessageSizeCounter()
|
||||
.record(outboundWireSize, attribute);
|
||||
module.resource.clientTotalReceivedCompressedMessageSizeCounter()
|
||||
.record(inboundWireSize, attribute);
|
||||
if (module.resource.clientAttemptCountCounter() != null ) {
|
||||
module.resource.clientAttemptDurationCounter()
|
||||
.record(attemptNanos * SECONDS_PER_NANO, attribute);
|
||||
}
|
||||
if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) {
|
||||
module.resource.clientTotalSentCompressedMessageSizeCounter()
|
||||
.record(outboundWireSize, attribute);
|
||||
}
|
||||
if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) {
|
||||
module.resource.clientTotalReceivedCompressedMessageSizeCounter()
|
||||
.record(inboundWireSize, attribute);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -219,7 +237,9 @@ final class OpenTelemetryMetricsModule {
|
|||
io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName);
|
||||
|
||||
// Record here in case mewClientStreamTracer() would never be called.
|
||||
module.resource.clientAttemptCountCounter().add(1, attribute);
|
||||
if (module.resource.clientAttemptCountCounter() != null) {
|
||||
module.resource.clientAttemptCountCounter().add(1, attribute);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -240,7 +260,9 @@ final class OpenTelemetryMetricsModule {
|
|||
// TODO(dnvindhya): Add target as an attribute
|
||||
io.opentelemetry.api.common.Attributes attribute =
|
||||
io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName);
|
||||
module.resource.clientAttemptCountCounter().add(1, attribute);
|
||||
if (module.resource.clientAttemptCountCounter() != null) {
|
||||
module.resource.clientAttemptCountCounter().add(1, attribute);
|
||||
}
|
||||
}
|
||||
if (!info.isTransparentRetry()) {
|
||||
attemptsPerCall.incrementAndGet();
|
||||
|
@ -298,8 +320,10 @@ final class OpenTelemetryMetricsModule {
|
|||
io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName,
|
||||
STATUS_KEY, status.getCode().toString());
|
||||
|
||||
module.resource.clientCallDurationCounter()
|
||||
.record(callLatencyNanos * SECONDS_PER_NANO, attribute);
|
||||
if (module.resource.clientCallDurationCounter() != null) {
|
||||
module.resource.clientCallDurationCounter()
|
||||
.record(callLatencyNanos * SECONDS_PER_NANO, attribute);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -360,7 +384,9 @@ final class OpenTelemetryMetricsModule {
|
|||
io.opentelemetry.api.common.Attributes.of(
|
||||
METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));
|
||||
|
||||
module.resource.serverCallCountCounter().add(1, attribute);
|
||||
if (module.resource.serverCallCountCounter() != null) {
|
||||
module.resource.serverCallCountCounter().add(1, attribute);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -408,12 +434,18 @@ final class OpenTelemetryMetricsModule {
|
|||
METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod),
|
||||
STATUS_KEY, status.getCode().toString());
|
||||
|
||||
module.resource.serverCallDurationCounter()
|
||||
.record(elapsedTimeNanos * SECONDS_PER_NANO, attributes);
|
||||
module.resource.serverTotalSentCompressedMessageSizeCounter()
|
||||
.record(outboundWireSize, attributes);
|
||||
module.resource.serverTotalReceivedCompressedMessageSizeCounter()
|
||||
.record(inboundWireSize, attributes);
|
||||
if (module.resource.serverCallDurationCounter() != null) {
|
||||
module.resource.serverCallDurationCounter()
|
||||
.record(elapsedTimeNanos * SECONDS_PER_NANO, attributes);
|
||||
}
|
||||
if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) {
|
||||
module.resource.serverTotalSentCompressedMessageSizeCounter()
|
||||
.record(outboundWireSize, attributes);
|
||||
}
|
||||
if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) {
|
||||
module.resource.serverTotalReceivedCompressedMessageSizeCounter()
|
||||
.record(inboundWireSize, attributes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,29 +20,39 @@ import com.google.auto.value.AutoValue;
|
|||
import io.opentelemetry.api.metrics.DoubleHistogram;
|
||||
import io.opentelemetry.api.metrics.LongCounter;
|
||||
import io.opentelemetry.api.metrics.LongHistogram;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
@AutoValue
|
||||
abstract class OpenTelemetryMetricsResource {
|
||||
|
||||
/* Client Metrics */
|
||||
@Nullable
|
||||
abstract DoubleHistogram clientCallDurationCounter();
|
||||
|
||||
@Nullable
|
||||
abstract LongCounter clientAttemptCountCounter();
|
||||
|
||||
@Nullable
|
||||
abstract DoubleHistogram clientAttemptDurationCounter();
|
||||
|
||||
@Nullable
|
||||
abstract LongHistogram clientTotalSentCompressedMessageSizeCounter();
|
||||
|
||||
@Nullable
|
||||
abstract LongHistogram clientTotalReceivedCompressedMessageSizeCounter();
|
||||
|
||||
|
||||
/* Server Metrics */
|
||||
@Nullable
|
||||
abstract LongCounter serverCallCountCounter();
|
||||
|
||||
@Nullable
|
||||
abstract DoubleHistogram serverCallDurationCounter();
|
||||
|
||||
@Nullable
|
||||
abstract LongHistogram serverTotalSentCompressedMessageSizeCounter();
|
||||
|
||||
@Nullable
|
||||
abstract LongHistogram serverTotalReceivedCompressedMessageSizeCounter();
|
||||
|
||||
static Builder builder() {
|
||||
|
|
|
@ -22,13 +22,24 @@ import static io.grpc.internal.GrpcUtil.IMPLEMENTATION_VERSION;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.base.Supplier;
|
||||
import io.grpc.ClientInterceptor;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.grpc.Configurator;
|
||||
import io.grpc.ConfiguratorRegistry;
|
||||
import io.grpc.ExperimentalApi;
|
||||
import io.grpc.ServerStreamTracer;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.grpc.MetricSink;
|
||||
import io.grpc.ServerBuilder;
|
||||
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
|
||||
import io.opentelemetry.api.OpenTelemetry;
|
||||
import io.opentelemetry.api.metrics.Meter;
|
||||
import io.opentelemetry.api.metrics.MeterProvider;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The entrypoint for OpenTelemetry metrics functionality in gRPC.
|
||||
|
@ -48,28 +59,38 @@ public final class OpenTelemetryModule {
|
|||
}
|
||||
};
|
||||
|
||||
private final OpenTelemetry openTelemetryInstance;
|
||||
private final OpenTelemetry openTelemetrySdk;
|
||||
private final MeterProvider meterProvider;
|
||||
private final Meter meter;
|
||||
private final Map<String, Boolean> enableMetrics;
|
||||
private final boolean disableDefault;
|
||||
private final OpenTelemetryMetricsResource resource;
|
||||
private final OpenTelemetryMetricsModule openTelemetryMetricsModule;
|
||||
private final List<String> optionalLabels;
|
||||
private final MetricSink sink;
|
||||
|
||||
public static Builder newBuilder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
private OpenTelemetryModule(Builder builder) {
|
||||
this.openTelemetryInstance = checkNotNull(builder.openTelemetrySdk, "openTelemetrySdk");
|
||||
this.meterProvider = checkNotNull(openTelemetryInstance.getMeterProvider(), "meterProvider");
|
||||
this.openTelemetrySdk = checkNotNull(builder.openTelemetrySdk, "openTelemetrySdk");
|
||||
this.meterProvider = checkNotNull(openTelemetrySdk.getMeterProvider(), "meterProvider");
|
||||
this.meter = this.meterProvider
|
||||
.meterBuilder(OpenTelemetryConstants.INSTRUMENTATION_SCOPE)
|
||||
.setInstrumentationVersion(IMPLEMENTATION_VERSION)
|
||||
.build();
|
||||
this.resource = createMetricInstruments(meter);
|
||||
this.enableMetrics = ImmutableMap.copyOf(builder.enableMetrics);
|
||||
this.disableDefault = builder.disableAll;
|
||||
this.resource = createMetricInstruments(meter, enableMetrics, disableDefault);
|
||||
this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule(STOPWATCH_SUPPLIER, resource);
|
||||
this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels);
|
||||
this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
OpenTelemetry getOpenTelemetryInstance() {
|
||||
return this.openTelemetryInstance;
|
||||
return this.openTelemetrySdk;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -87,105 +108,170 @@ public final class OpenTelemetryModule {
|
|||
return this.resource;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link ClientInterceptor} with metrics implementation.
|
||||
*/
|
||||
public ClientInterceptor getClientInterceptor() {
|
||||
OpenTelemetryMetricsModule openTelemetryMetricsModule =
|
||||
new OpenTelemetryMetricsModule(
|
||||
STOPWATCH_SUPPLIER,
|
||||
resource);
|
||||
return openTelemetryMetricsModule.getClientInterceptor();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link ServerStreamTracer.Factory} with metrics implementation.
|
||||
*/
|
||||
public ServerStreamTracer.Factory getServerStreamTracerFactory() {
|
||||
OpenTelemetryMetricsModule openTelemetryMetricsModule =
|
||||
new OpenTelemetryMetricsModule(
|
||||
STOPWATCH_SUPPLIER,
|
||||
resource);
|
||||
return openTelemetryMetricsModule.getServerTracerFactory();
|
||||
@VisibleForTesting
|
||||
Map<String, Boolean> getEnableMetrics() {
|
||||
return this.enableMetrics;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static OpenTelemetryMetricsResource createMetricInstruments(Meter meter) {
|
||||
List<String> getOptionalLabels() {
|
||||
return optionalLabels;
|
||||
}
|
||||
|
||||
MetricSink getSink() {
|
||||
return sink;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers OpenTelemetryModule globally, applying its configuration to all subsequently created
|
||||
* gRPC channels and servers.
|
||||
*/
|
||||
public void registerGlobal() {
|
||||
ConfiguratorRegistry.getDefaultRegistry().setConfigurators(Collections.singletonList(
|
||||
new Configurator() {
|
||||
@Override
|
||||
public void configureChannelBuilder(ManagedChannelBuilder<?> channelBuilder) {
|
||||
OpenTelemetryModule.this.configureChannelBuilder(channelBuilder);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configureServerBuilder(ServerBuilder<?> serverBuilder) {
|
||||
OpenTelemetryModule.this.configureServerBuilder(serverBuilder);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures the given {@link ManagedChannelBuilder} with OpenTelemetry metrics instrumentation.
|
||||
*/
|
||||
public void configureChannelBuilder(ManagedChannelBuilder<?> builder) {
|
||||
builder.addMetricSink(sink);
|
||||
builder.intercept(openTelemetryMetricsModule.getClientInterceptor());
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures the given {@link ServerBuilder} with OpenTelemetry metrics instrumentation.
|
||||
*
|
||||
* @param serverBuilder the server builder to configure
|
||||
*/
|
||||
public void configureServerBuilder(ServerBuilder<?> serverBuilder) {
|
||||
serverBuilder.addStreamTracerFactory(openTelemetryMetricsModule.getServerTracerFactory());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static OpenTelemetryMetricsResource createMetricInstruments(Meter meter,
|
||||
Map<String, Boolean> enableMetrics, boolean disableDefault) {
|
||||
OpenTelemetryMetricsResource.Builder builder = OpenTelemetryMetricsResource.builder();
|
||||
|
||||
builder.clientCallDurationCounter(
|
||||
meter.histogramBuilder("grpc.client.call.duration")
|
||||
.setUnit("s")
|
||||
.setDescription(
|
||||
"Time taken by gRPC to complete an RPC from application's perspective")
|
||||
.build());
|
||||
if (isMetricEnabled("grpc.client.call.duration", enableMetrics, disableDefault)) {
|
||||
builder.clientCallDurationCounter(
|
||||
meter.histogramBuilder("grpc.client.call.duration")
|
||||
.setUnit("s")
|
||||
.setDescription(
|
||||
"Time taken by gRPC to complete an RPC from application's perspective")
|
||||
.build());
|
||||
}
|
||||
|
||||
builder.clientAttemptCountCounter(
|
||||
meter.counterBuilder("grpc.client.attempt.started")
|
||||
.setUnit("{attempt}")
|
||||
.setDescription("Number of client call attempts started")
|
||||
.build());
|
||||
if (isMetricEnabled("grpc.client.attempt.started", enableMetrics, disableDefault)) {
|
||||
builder.clientAttemptCountCounter(
|
||||
meter.counterBuilder("grpc.client.attempt.started")
|
||||
.setUnit("{attempt}")
|
||||
.setDescription("Number of client call attempts started")
|
||||
.build());
|
||||
}
|
||||
|
||||
builder.clientAttemptDurationCounter(
|
||||
meter.histogramBuilder(
|
||||
"grpc.client.attempt.duration")
|
||||
.setUnit("s")
|
||||
.setDescription("Time taken to complete a client call attempt")
|
||||
.build());
|
||||
if (isMetricEnabled("grpc.client.attempt.duration", enableMetrics, disableDefault)) {
|
||||
builder.clientAttemptDurationCounter(
|
||||
meter.histogramBuilder(
|
||||
"grpc.client.attempt.duration")
|
||||
.setUnit("s")
|
||||
.setDescription("Time taken to complete a client call attempt")
|
||||
.build());
|
||||
}
|
||||
|
||||
builder.clientTotalSentCompressedMessageSizeCounter(
|
||||
meter.histogramBuilder(
|
||||
"grpc.client.attempt.sent_total_compressed_message_size")
|
||||
.setUnit("By")
|
||||
.setDescription("Compressed message bytes sent per client call attempt")
|
||||
.ofLongs()
|
||||
.build());
|
||||
if (isMetricEnabled("grpc.client.attempt.sent_total_compressed_message_size", enableMetrics,
|
||||
disableDefault)) {
|
||||
builder.clientTotalSentCompressedMessageSizeCounter(
|
||||
meter.histogramBuilder(
|
||||
"grpc.client.attempt.sent_total_compressed_message_size")
|
||||
.setUnit("By")
|
||||
.setDescription("Compressed message bytes sent per client call attempt")
|
||||
.ofLongs()
|
||||
.build());
|
||||
}
|
||||
|
||||
builder.clientTotalReceivedCompressedMessageSizeCounter(
|
||||
meter.histogramBuilder(
|
||||
"grpc.client.attempt.rcvd_total_compressed_message_size")
|
||||
.setUnit("By")
|
||||
.setDescription("Compressed message bytes received per call attempt")
|
||||
.ofLongs()
|
||||
.build());
|
||||
if (isMetricEnabled("grpc.client.attempt.rcvd_total_compressed_message_size", enableMetrics,
|
||||
disableDefault)) {
|
||||
builder.clientTotalReceivedCompressedMessageSizeCounter(
|
||||
meter.histogramBuilder(
|
||||
"grpc.client.attempt.rcvd_total_compressed_message_size")
|
||||
.setUnit("By")
|
||||
.setDescription("Compressed message bytes received per call attempt")
|
||||
.ofLongs()
|
||||
.build());
|
||||
}
|
||||
|
||||
builder.serverCallCountCounter(
|
||||
meter.counterBuilder("grpc.server.call.started")
|
||||
.setUnit("{call}")
|
||||
.setDescription("Number of server calls started")
|
||||
.build());
|
||||
if (isMetricEnabled("grpc.server.call.started", enableMetrics, disableDefault)) {
|
||||
builder.serverCallCountCounter(
|
||||
meter.counterBuilder("grpc.server.call.started")
|
||||
.setUnit("{call}")
|
||||
.setDescription("Number of server calls started")
|
||||
.build());
|
||||
}
|
||||
|
||||
builder.serverCallDurationCounter(
|
||||
meter.histogramBuilder("grpc.server.call.duration")
|
||||
.setUnit("s")
|
||||
.setDescription(
|
||||
"Time taken to complete a call from server transport's perspective")
|
||||
.build());
|
||||
if (isMetricEnabled("grpc.server.call.duration", enableMetrics, disableDefault)) {
|
||||
builder.serverCallDurationCounter(
|
||||
meter.histogramBuilder("grpc.server.call.duration")
|
||||
.setUnit("s")
|
||||
.setDescription(
|
||||
"Time taken to complete a call from server transport's perspective")
|
||||
.build());
|
||||
}
|
||||
|
||||
builder.serverTotalSentCompressedMessageSizeCounter(
|
||||
meter.histogramBuilder(
|
||||
"grpc.server.call.sent_total_compressed_message_size")
|
||||
.setUnit("By")
|
||||
.setDescription("Compressed message bytes sent per server call")
|
||||
.ofLongs()
|
||||
.build());
|
||||
if (isMetricEnabled("grpc.server.call.sent_total_compressed_message_size", enableMetrics,
|
||||
disableDefault)) {
|
||||
builder.serverTotalSentCompressedMessageSizeCounter(
|
||||
meter.histogramBuilder(
|
||||
"grpc.server.call.sent_total_compressed_message_size")
|
||||
.setUnit("By")
|
||||
.setDescription("Compressed message bytes sent per server call")
|
||||
.ofLongs()
|
||||
.build());
|
||||
}
|
||||
|
||||
builder.serverTotalReceivedCompressedMessageSizeCounter(
|
||||
meter.histogramBuilder(
|
||||
"grpc.server.call.rcvd_total_compressed_message_size")
|
||||
.setUnit("By")
|
||||
.setDescription("Compressed message bytes received per server call")
|
||||
.ofLongs()
|
||||
.build());
|
||||
if (isMetricEnabled("grpc.server.call.rcvd_total_compressed_message_size", enableMetrics,
|
||||
disableDefault)) {
|
||||
builder.serverTotalReceivedCompressedMessageSizeCounter(
|
||||
meter.histogramBuilder(
|
||||
"grpc.server.call.rcvd_total_compressed_message_size")
|
||||
.setUnit("By")
|
||||
.setDescription("Compressed message bytes received per server call")
|
||||
.ofLongs()
|
||||
.build());
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
static boolean isMetricEnabled(String metricName, Map<String, Boolean> enableMetrics,
|
||||
boolean disableDefault) {
|
||||
Boolean explicitlyEnabled = enableMetrics.get(metricName);
|
||||
if (explicitlyEnabled != null) {
|
||||
return explicitlyEnabled;
|
||||
}
|
||||
return OpenTelemetryMetricsModule.DEFAULT_PER_CALL_METRICS_SET.contains(metricName)
|
||||
&& !disableDefault;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Builder for configuring {@link OpenTelemetryModule}.
|
||||
*/
|
||||
public static class Builder {
|
||||
private OpenTelemetry openTelemetrySdk = OpenTelemetry.noop();
|
||||
private final Collection<String> optionalLabels = new ArrayList<>();
|
||||
private final Map<String, Boolean> enableMetrics = new HashMap<>();
|
||||
private boolean disableAll;
|
||||
|
||||
private Builder() {}
|
||||
|
||||
|
@ -199,6 +285,45 @@ public final class OpenTelemetryModule {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds optionalLabelKey to all the metrics that can provide value for the
|
||||
* optionalLabelKey.
|
||||
*/
|
||||
public Builder addOptionalLabel(String optionalLabelKey) {
|
||||
this.optionalLabels.add(optionalLabelKey);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enables the specified metrics for collection and export. By default, only a subset of
|
||||
* metrics are enabled.
|
||||
*/
|
||||
public Builder enableMetrics(Collection<String> enableMetrics) {
|
||||
for (String metric : enableMetrics) {
|
||||
this.enableMetrics.put(metric, true);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Disables the specified metrics from being collected and exported.
|
||||
*/
|
||||
public Builder disableMetrics(Collection<String> disableMetrics) {
|
||||
for (String metric : disableMetrics) {
|
||||
this.enableMetrics.put(metric, false);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Disable all metrics. If set to true all metrics must be explicitly enabled.
|
||||
*/
|
||||
public Builder disableAllMetrics() {
|
||||
this.enableMetrics.clear();
|
||||
this.disableAll = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new {@link OpenTelemetryModule} built with the configuration of this {@link
|
||||
* Builder}.
|
||||
|
|
|
@ -0,0 +1,347 @@
|
|||
/*
|
||||
* Copyright 2024 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.opentelemetry;
|
||||
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.grpc.DoubleCounterMetricInstrument;
|
||||
import io.grpc.DoubleHistogramMetricInstrument;
|
||||
import io.grpc.LongCounterMetricInstrument;
|
||||
import io.grpc.LongHistogramMetricInstrument;
|
||||
import io.grpc.MetricInstrument;
|
||||
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.metrics.DoubleCounter;
|
||||
import io.opentelemetry.api.metrics.DoubleHistogram;
|
||||
import io.opentelemetry.api.metrics.LongCounter;
|
||||
import io.opentelemetry.api.metrics.Meter;
|
||||
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
|
||||
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
@RunWith(JUnit4.class)
|
||||
public class OpenTelemetryMetricSinkTest {
|
||||
|
||||
@Rule
|
||||
public final OpenTelemetryRule openTelemetryTesting = OpenTelemetryRule.create();
|
||||
|
||||
private final Meter testMeter = openTelemetryTesting.getOpenTelemetry()
|
||||
.getMeter(OpenTelemetryConstants.INSTRUMENTATION_SCOPE);
|
||||
|
||||
private OpenTelemetryMetricSink sink;
|
||||
|
||||
@Test
|
||||
public void updateMeasures_enabledMetrics() {
|
||||
Map<String, Boolean> enabledMetrics = new HashMap<>();
|
||||
enabledMetrics.put("client_calls_started", true);
|
||||
enabledMetrics.put("server_calls_started", true);
|
||||
|
||||
List<String> optionalLabels = Arrays.asList("status");
|
||||
|
||||
List<MetricInstrument> instruments = Arrays.asList(
|
||||
new DoubleCounterMetricInstrument(0, "client_calls_started",
|
||||
"Number of client calls started", "count", Collections.emptyList(),
|
||||
Collections.emptyList(),
|
||||
true),
|
||||
new LongCounterMetricInstrument(1, "server_calls_started", "Number of server calls started",
|
||||
"count", Collections.emptyList(), Collections.emptyList(), false),
|
||||
new DoubleHistogramMetricInstrument(2, "client_message_size", "Sent message size", "bytes",
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList(), Collections.emptyList(), true)
|
||||
);
|
||||
|
||||
// Create sink
|
||||
sink = new OpenTelemetryMetricSink(testMeter, enabledMetrics, false, optionalLabels);
|
||||
|
||||
// Invoke updateMeasures
|
||||
sink.updateMeasures(instruments);
|
||||
|
||||
com.google.common.truth.Truth.assertThat(sink.getMeasuresSize()).isEqualTo(3);
|
||||
// Metric is explicitly enabled for sink
|
||||
com.google.common.truth.Truth.assertThat(sink.getMeasures().get(0).getMeasure())
|
||||
.isInstanceOf(DoubleCounter.class);
|
||||
// Metric is explicitly enabled for sink
|
||||
com.google.common.truth.Truth.assertThat(sink.getMeasures().get(1).getMeasure())
|
||||
.isInstanceOf(LongCounter.class);
|
||||
// Metric is enabled by default
|
||||
com.google.common.truth.Truth.assertThat(sink.getMeasures().get(2).getMeasure())
|
||||
.isInstanceOf(DoubleHistogram.class);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void updateMeasure_disabledMetrics() {
|
||||
Map<String, Boolean> enabledMetrics = new HashMap<>();
|
||||
enabledMetrics.put("client_calls_started", false);
|
||||
enabledMetrics.put("server_calls_started", false);
|
||||
|
||||
List<String> optionalLabels = Arrays.asList("status");
|
||||
|
||||
List<MetricInstrument> instruments = Arrays.asList(
|
||||
new DoubleCounterMetricInstrument(0, "client_calls_started",
|
||||
"Number of client calls started", "count", Collections.emptyList(),
|
||||
Collections.emptyList(), true),
|
||||
new LongCounterMetricInstrument(1, "server_calls_started", "Number of server calls started",
|
||||
"count", Collections.emptyList(), Collections.emptyList(), true),
|
||||
new DoubleHistogramMetricInstrument(2, "client_message_size", "Sent message size", "bytes",
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList(), Collections.emptyList(), true)
|
||||
);
|
||||
|
||||
// Create sink
|
||||
sink = new OpenTelemetryMetricSink(testMeter, enabledMetrics, true, optionalLabels);
|
||||
|
||||
// Invoke updateMeasures
|
||||
sink.updateMeasures(instruments);
|
||||
|
||||
com.google.common.truth.Truth.assertThat(sink.getMeasuresSize()).isEqualTo(3);
|
||||
// Metric is explicitly disabled
|
||||
com.google.common.truth.Truth.assertThat(sink.getMeasures().get(0)).isNull();
|
||||
// Metric is explicitly disabled
|
||||
com.google.common.truth.Truth.assertThat(sink.getMeasures().get(1)).isNull();
|
||||
// Metric is enabled by default, but all default metrics are disabled
|
||||
com.google.common.truth.Truth.assertThat(sink.getMeasures().get(2)).isNull();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void addCounter_enabledMetric() {
|
||||
// set up sink with disabled metric
|
||||
Map<String, Boolean> enabledMetrics = new HashMap<>();
|
||||
enabledMetrics.put("client_latency", true);
|
||||
|
||||
LongCounterMetricInstrument longCounterInstrument =
|
||||
new LongCounterMetricInstrument(0, "client_latency", "Client latency", "s",
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList(), false);
|
||||
DoubleCounterMetricInstrument doubleCounterInstrument =
|
||||
new DoubleCounterMetricInstrument(1, "client_calls_started",
|
||||
"Number of client calls started", "count", Collections.emptyList(),
|
||||
Collections.emptyList(),
|
||||
true);
|
||||
// Create sink
|
||||
sink = new OpenTelemetryMetricSink(testMeter, enabledMetrics, false, Collections.emptyList());
|
||||
|
||||
// Invoke updateMeasures
|
||||
sink.updateMeasures(Arrays.asList(longCounterInstrument, doubleCounterInstrument));
|
||||
|
||||
sink.addLongCounter(longCounterInstrument, 123L, Collections.emptyList(),
|
||||
Collections.emptyList());
|
||||
sink.addDoubleCounter(doubleCounterInstrument, 12.0, Collections.emptyList(),
|
||||
Collections.emptyList());
|
||||
|
||||
assertThat(openTelemetryTesting.getMetrics())
|
||||
.satisfiesExactlyInAnyOrder(
|
||||
metric ->
|
||||
assertThat(metric)
|
||||
.hasInstrumentationScope(InstrumentationScopeInfo.create(
|
||||
OpenTelemetryConstants.INSTRUMENTATION_SCOPE))
|
||||
.hasName("client_latency")
|
||||
.hasDescription("Client latency")
|
||||
.hasUnit("s")
|
||||
.hasLongSumSatisfying(
|
||||
longSum ->
|
||||
longSum
|
||||
.hasPointsSatisfying(
|
||||
point ->
|
||||
point
|
||||
.hasValue(123L))),
|
||||
metric ->
|
||||
assertThat(metric)
|
||||
.hasInstrumentationScope(InstrumentationScopeInfo.create(
|
||||
OpenTelemetryConstants.INSTRUMENTATION_SCOPE))
|
||||
.hasName("client_calls_started")
|
||||
.hasDescription("Number of client calls started")
|
||||
.hasUnit("count")
|
||||
.hasDoubleSumSatisfying(
|
||||
doubleSum ->
|
||||
doubleSum
|
||||
.hasPointsSatisfying(
|
||||
point ->
|
||||
point
|
||||
.hasValue(12.0D))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void addCounter_disabledMetric() {
|
||||
// set up sink with disabled metric
|
||||
Map<String, Boolean> enabledMetrics = new HashMap<>();
|
||||
enabledMetrics.put("client_latency", false);
|
||||
|
||||
LongCounterMetricInstrument instrument =
|
||||
new LongCounterMetricInstrument(0, "client_latency", "Client latency", "s",
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList(), true);
|
||||
// Create sink
|
||||
sink = new OpenTelemetryMetricSink(testMeter, enabledMetrics, true, Collections.emptyList());
|
||||
|
||||
// Invoke updateMeasures
|
||||
sink.updateMeasures(Arrays.asList(instrument));
|
||||
|
||||
sink.addLongCounter(instrument, 123L, Collections.emptyList(), Collections.emptyList());
|
||||
|
||||
assertThat(openTelemetryTesting.getMetrics()).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void addHistogram_enabledMetric() {
|
||||
// set up sink with disabled metric
|
||||
Map<String, Boolean> enabledMetrics = new HashMap<>();
|
||||
enabledMetrics.put("client_message_size", true);
|
||||
enabledMetrics.put("server_message_size", true);
|
||||
|
||||
DoubleHistogramMetricInstrument doubleHistogramInstrument =
|
||||
new DoubleHistogramMetricInstrument(0, "client_message_size", "Sent message size", "bytes",
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList(), Collections.emptyList(), false);
|
||||
LongHistogramMetricInstrument longHistogramInstrument =
|
||||
new LongHistogramMetricInstrument(1, "server_message_size", "Received message size",
|
||||
"bytes",
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList(), Collections.emptyList(), true);
|
||||
|
||||
// Create sink
|
||||
sink = new OpenTelemetryMetricSink(testMeter, enabledMetrics, false, Collections.emptyList());
|
||||
|
||||
// Invoke updateMeasures
|
||||
sink.updateMeasures(Arrays.asList(doubleHistogramInstrument, longHistogramInstrument));
|
||||
|
||||
sink.recordDoubleHistogram(doubleHistogramInstrument, 12.0, Collections.emptyList(),
|
||||
Collections.emptyList());
|
||||
sink.recordLongHistogram(longHistogramInstrument, 123L, Collections.emptyList(),
|
||||
Collections.emptyList());
|
||||
|
||||
assertThat(openTelemetryTesting.getMetrics())
|
||||
.satisfiesExactlyInAnyOrder(
|
||||
metric ->
|
||||
assertThat(metric)
|
||||
.hasInstrumentationScope(InstrumentationScopeInfo.create(
|
||||
OpenTelemetryConstants.INSTRUMENTATION_SCOPE))
|
||||
.hasName("client_message_size")
|
||||
.hasDescription("Sent message size")
|
||||
.hasUnit("bytes")
|
||||
.hasHistogramSatisfying(
|
||||
histogram ->
|
||||
histogram.hasPointsSatisfying(
|
||||
point ->
|
||||
point
|
||||
.hasCount(1)
|
||||
.hasSum(12.0))),
|
||||
|
||||
metric ->
|
||||
assertThat(metric)
|
||||
.hasInstrumentationScope(InstrumentationScopeInfo.create(
|
||||
OpenTelemetryConstants.INSTRUMENTATION_SCOPE))
|
||||
.hasName("server_message_size")
|
||||
.hasDescription("Received message size")
|
||||
.hasUnit("bytes")
|
||||
.hasHistogramSatisfying(
|
||||
histogram ->
|
||||
histogram.hasPointsSatisfying(
|
||||
point ->
|
||||
point
|
||||
.hasCount(1)
|
||||
.hasSum(123L))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void addHistogram_disabledMetric() {
|
||||
// set up sink with disabled metric
|
||||
Map<String, Boolean> enabledMetrics = new HashMap<>();
|
||||
enabledMetrics.put("client_message_size", false);
|
||||
enabledMetrics.put("server_message_size", false);
|
||||
|
||||
DoubleHistogramMetricInstrument doubleHistogramInstrument =
|
||||
new DoubleHistogramMetricInstrument(0, "client_message_size", "Sent message size", "bytes",
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList(), Collections.emptyList(), false);
|
||||
LongHistogramMetricInstrument longHistogramInstrument =
|
||||
new LongHistogramMetricInstrument(1, "server_message_size", "Received message size",
|
||||
"bytes",
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList(), Collections.emptyList(), true);
|
||||
|
||||
// Create sink
|
||||
sink = new OpenTelemetryMetricSink(testMeter, enabledMetrics, false, Collections.emptyList());
|
||||
|
||||
// Invoke updateMeasures
|
||||
sink.updateMeasures(Arrays.asList(doubleHistogramInstrument, longHistogramInstrument));
|
||||
|
||||
sink.recordDoubleHistogram(doubleHistogramInstrument, 12.0, Collections.emptyList(),
|
||||
Collections.emptyList());
|
||||
sink.recordLongHistogram(longHistogramInstrument, 123L, Collections.emptyList(),
|
||||
Collections.emptyList());
|
||||
|
||||
assertThat(openTelemetryTesting.getMetrics()).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void recordLabels() {
|
||||
Map<String, Boolean> enabledMetrics = new HashMap<>();
|
||||
enabledMetrics.put("client_latency", true);
|
||||
|
||||
List<String> optionalLabels = Arrays.asList("optional_label_key_2");
|
||||
|
||||
LongCounterMetricInstrument longCounterInstrument =
|
||||
new LongCounterMetricInstrument(0, "client_latency", "Client latency", "s",
|
||||
ImmutableList.of("required_label_key_1", "required_label_key_2"),
|
||||
ImmutableList.of("optional_label_key_1", "optional_label_key_2"), false);
|
||||
|
||||
// Create sink
|
||||
sink = new OpenTelemetryMetricSink(testMeter, enabledMetrics, false, optionalLabels);
|
||||
|
||||
// Invoke updateMeasures
|
||||
sink.updateMeasures(Arrays.asList(longCounterInstrument));
|
||||
|
||||
sink.addLongCounter(longCounterInstrument, 123L,
|
||||
ImmutableList.of("required_label_value_1", "required_label_value_2"),
|
||||
ImmutableList.of("optional_label_value_1", "optional_label_value_2"));
|
||||
|
||||
io.opentelemetry.api.common.Attributes expectedAtrributes
|
||||
= io.opentelemetry.api.common.Attributes.of(
|
||||
AttributeKey.stringKey("required_label_key_1"), "required_label_value_1",
|
||||
AttributeKey.stringKey("required_label_key_2"), "required_label_value_2",
|
||||
AttributeKey.stringKey("optional_label_key_2"), "optional_label_value_2");
|
||||
|
||||
assertThat(openTelemetryTesting.getMetrics())
|
||||
.satisfiesExactlyInAnyOrder(
|
||||
metric ->
|
||||
assertThat(metric)
|
||||
.hasInstrumentationScope(InstrumentationScopeInfo.create(
|
||||
OpenTelemetryConstants.INSTRUMENTATION_SCOPE))
|
||||
.hasName("client_latency")
|
||||
.hasDescription("Client latency")
|
||||
.hasUnit("s")
|
||||
.hasLongSumSatisfying(
|
||||
longSum ->
|
||||
longSum
|
||||
.hasPointsSatisfying(
|
||||
point ->
|
||||
point
|
||||
.hasAttributes(expectedAtrributes)
|
||||
.hasValue(123L))));
|
||||
}
|
||||
}
|
|
@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
|
|||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
|
@ -50,6 +51,7 @@ import io.opentelemetry.api.metrics.Meter;
|
|||
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
|
||||
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
|
||||
import java.io.InputStream;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -141,6 +143,9 @@ public class OpenTelemetryMetricsModuleTest {
|
|||
.setSampledToLocalTracing(true)
|
||||
.build();
|
||||
private Meter testMeter;
|
||||
private final Map<String, Boolean> enabledMetricsMap = ImmutableMap.of();
|
||||
|
||||
private final boolean disableDefaultMetrics = false;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
@ -150,7 +155,8 @@ public class OpenTelemetryMetricsModuleTest {
|
|||
|
||||
@Test
|
||||
public void testClientInterceptors() {
|
||||
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter);
|
||||
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
|
||||
enabledMetricsMap, disableDefaultMetrics);
|
||||
OpenTelemetryMetricsModule module =
|
||||
new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource);
|
||||
grpcServerRule.getServiceRegistry().addService(
|
||||
|
@ -205,7 +211,8 @@ public class OpenTelemetryMetricsModuleTest {
|
|||
|
||||
@Test
|
||||
public void clientBasicMetrics() {
|
||||
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter);;
|
||||
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
|
||||
enabledMetricsMap, disableDefaultMetrics);
|
||||
OpenTelemetryMetricsModule module =
|
||||
new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource);
|
||||
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
|
||||
|
@ -339,7 +346,8 @@ public class OpenTelemetryMetricsModuleTest {
|
|||
// This test is only unit-testing the metrics recording logic. The retry behavior is faked.
|
||||
@Test
|
||||
public void recordAttemptMetrics() {
|
||||
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter);
|
||||
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
|
||||
enabledMetricsMap, disableDefaultMetrics);
|
||||
OpenTelemetryMetricsModule module =
|
||||
new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource);
|
||||
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
|
||||
|
@ -759,7 +767,8 @@ public class OpenTelemetryMetricsModuleTest {
|
|||
|
||||
@Test
|
||||
public void clientStreamNeverCreatedStillRecordMetrics() {
|
||||
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter);
|
||||
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
|
||||
enabledMetricsMap, disableDefaultMetrics);
|
||||
OpenTelemetryMetricsModule module =
|
||||
new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource);
|
||||
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
|
||||
|
@ -860,7 +869,8 @@ public class OpenTelemetryMetricsModuleTest {
|
|||
|
||||
@Test
|
||||
public void serverBasicMetrics() {
|
||||
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter);
|
||||
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
|
||||
enabledMetricsMap, disableDefaultMetrics);
|
||||
OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
|
||||
fakeClock.getStopwatchSupplier(), resource);
|
||||
ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory();
|
||||
|
|
|
@ -18,11 +18,14 @@ package io.grpc.opentelemetry;
|
|||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.grpc.MetricSink;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.opentelemetry.api.OpenTelemetry;
|
||||
import io.opentelemetry.sdk.OpenTelemetrySdk;
|
||||
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
|
||||
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
|
||||
import java.util.Arrays;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
@ -38,8 +41,10 @@ public class OpenTelemetryModuleTest {
|
|||
public void build() {
|
||||
OpenTelemetrySdk sdk =
|
||||
OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build();
|
||||
|
||||
OpenTelemetryModule openTelemetryModule = OpenTelemetryModule.newBuilder()
|
||||
.sdk(sdk)
|
||||
.addOptionalLabel("version")
|
||||
.build();
|
||||
|
||||
assertThat(openTelemetryModule.getOpenTelemetryInstance()).isSameInstanceAs(sdk);
|
||||
|
@ -48,6 +53,7 @@ public class OpenTelemetryModuleTest {
|
|||
meterProvider.meterBuilder("grpc-java")
|
||||
.setInstrumentationVersion(GrpcUtil.IMPLEMENTATION_VERSION)
|
||||
.build());
|
||||
assertThat(openTelemetryModule.getOptionalLabels()).isEqualTo(ImmutableList.of("version"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -64,5 +70,37 @@ public class OpenTelemetryModuleTest {
|
|||
.meterBuilder("grpc-java")
|
||||
.setInstrumentationVersion(GrpcUtil.IMPLEMENTATION_VERSION)
|
||||
.build());
|
||||
assertThat(module.getEnableMetrics()).isEmpty();
|
||||
assertThat(module.getOptionalLabels()).isEmpty();
|
||||
assertThat(module.getSink()).isInstanceOf(MetricSink.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void enableDisableMetrics() {
|
||||
OpenTelemetryModule.Builder builder = OpenTelemetryModule.newBuilder();
|
||||
builder.enableMetrics(Arrays.asList("metric1", "metric4"));
|
||||
builder.disableMetrics(Arrays.asList("metric2", "metric3"));
|
||||
|
||||
OpenTelemetryModule module = builder.build();
|
||||
|
||||
assertThat(module.getEnableMetrics().get("metric1")).isTrue();
|
||||
assertThat(module.getEnableMetrics().get("metric4")).isTrue();
|
||||
assertThat(module.getEnableMetrics().get("metric2")).isFalse();
|
||||
assertThat(module.getEnableMetrics().get("metric3")).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void disableAllMetrics() {
|
||||
OpenTelemetryModule.Builder builder = OpenTelemetryModule.newBuilder();
|
||||
builder.enableMetrics(Arrays.asList("metric1", "metric4"));
|
||||
builder.disableMetrics(Arrays.asList("metric2", "metric3"));
|
||||
builder.disableAllMetrics();
|
||||
|
||||
OpenTelemetryModule module = builder.build();
|
||||
|
||||
assertThat(module.getEnableMetrics()).isEmpty();
|
||||
}
|
||||
|
||||
// TODO(dnvindhya): Add tests for configurator
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue