Add gauge metric API and Otel implementation

This is needed by gRFC A78 for xds metrics, and for RLS metrics. Since
gauges need to acquire a lock (or other synchronization) in the
callback, the callback allows batching multiple gauges together to avoid
acquiring-and-requiring such locks.

Unlike other metrics, gauges are reported on-demand to the MetricSink.
This means not all sinks will receive the same data, as the sinks will
ask for the gauges at different times.
This commit is contained in:
Eric Anderson 2024-05-06 11:38:04 -07:00
parent 1994125c78
commit b6f7b693e7
8 changed files with 365 additions and 2 deletions

View File

@ -0,0 +1,23 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc;
/**
* Tagging interface for MetricInstruments that can be used with batch callbacks.
*/
@Internal
public interface CallbackMetricInstrument extends MetricInstrument {}

View File

@ -22,7 +22,8 @@ import java.util.List;
* Represents a long-valued gauge metric instrument.
*/
@Internal
public final class LongGaugeMetricInstrument extends PartialMetricInstrument {
public final class LongGaugeMetricInstrument extends PartialMetricInstrument
implements CallbackMetricInstrument {
public LongGaugeMetricInstrument(int index, String name, String description, String unit,
List<String> requiredLabelKeys, List<String> optionalLabelKeys, boolean enableByDefault) {
super(index, name, description, unit, requiredLabelKeys, optionalLabelKeys, enableByDefault);

View File

@ -67,4 +67,43 @@ public interface MetricRecorder {
*/
default void recordLongHistogram(LongHistogramMetricInstrument metricInstrument, long value,
List<String> requiredLabelValues, List<String> optionalLabelValues) {}
/**
* Registers a callback to produce metric values for only the listed instruments. The returned
* registration must be closed when no longer needed, which will remove the callback.
*
* @param callback The callback to call to record.
* @param metricInstruments The metric instruments the callback will record against.
*/
default Registration registerBatchCallback(BatchCallback callback,
CallbackMetricInstrument... metricInstruments) {
return () -> { };
}
/** Callback to record gauge values. */
interface BatchCallback {
/** Records instrument values into {@code recorder}. */
void accept(BatchRecorder recorder);
}
/** Recorder for instrument values produced by a batch callback. */
interface BatchRecorder {
/**
* Record a long gauge value.
*
* @param value The value to record.
* @param requiredLabelValues A list of required label values for the metric.
* @param optionalLabelValues A list of additional, optional label values for the metric.
*/
void recordLongGauge(LongGaugeMetricInstrument metricInstrument, long value,
List<String> requiredLabelValues, List<String> optionalLabelValues);
}
/** A handle to a registration, that allows unregistration. */
interface Registration extends AutoCloseable {
// Redefined to not throw an exception.
/** Unregister. */
@Override
void close();
}
}

View File

@ -99,5 +99,30 @@ public interface MetricSink {
List<String> requiredLabelValues, List<String> optionalLabelValues) {
}
/**
* Record a long gauge value.
*
* @param value The value to record.
* @param requiredLabelValues A list of required label values for the metric.
* @param optionalLabelValues A list of additional, optional label values for the metric.
*/
default void recordLongGauge(LongGaugeMetricInstrument metricInstrument, long value,
List<String> requiredLabelValues, List<String> optionalLabelValues){
}
/**
* Registers a callback to produce metric values for only the listed instruments. The returned
* registration must be closed when no longer needed, which will remove the callback.
*
* @param callback The callback to call to record.
* @param metricInstruments The metric instruments the callback will record against.
*/
default Registration registerBatchCallback(Runnable callback,
CallbackMetricInstrument... metricInstruments) {
return () -> { };
}
interface Registration extends MetricRecorder.Registration {}
void updateMeasures(List<MetricInstrument> instruments);
}

View File

@ -17,16 +17,21 @@
package io.grpc.internal;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.CallbackMetricInstrument;
import io.grpc.DoubleCounterMetricInstrument;
import io.grpc.DoubleHistogramMetricInstrument;
import io.grpc.LongCounterMetricInstrument;
import io.grpc.LongGaugeMetricInstrument;
import io.grpc.LongHistogramMetricInstrument;
import io.grpc.MetricInstrument;
import io.grpc.MetricInstrumentRegistry;
import io.grpc.MetricRecorder;
import io.grpc.MetricSink;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
/**
@ -171,4 +176,62 @@ final class MetricRecorderImpl implements MetricRecorder {
sink.recordLongHistogram(metricInstrument, value, requiredLabelValues, optionalLabelValues);
}
}
@Override
public Registration registerBatchCallback(BatchCallback callback,
CallbackMetricInstrument... metricInstruments) {
long largestMetricInstrumentIndex = -1;
BitSet allowedInstruments = new BitSet();
for (CallbackMetricInstrument metricInstrument : metricInstruments) {
largestMetricInstrumentIndex =
Math.max(largestMetricInstrumentIndex, metricInstrument.getIndex());
allowedInstruments.set(metricInstrument.getIndex());
}
List<MetricSink.Registration> registrations = new ArrayList<>();
for (MetricSink sink : metricSinks) {
int measuresSize = sink.getMeasuresSize();
if (measuresSize <= largestMetricInstrumentIndex) {
// Measures may need updating in two cases:
// 1. When the sink is initially created with an empty list of measures.
// 2. When new metric instruments are registered, requiring the sink to accommodate them.
sink.updateMeasures(registry.getMetricInstruments());
}
BatchRecorder singleSinkRecorder = new BatchRecorderImpl(sink, allowedInstruments);
registrations.add(sink.registerBatchCallback(
() -> callback.accept(singleSinkRecorder), metricInstruments));
}
return () -> {
for (MetricSink.Registration registration : registrations) {
registration.close();
}
};
}
/** Recorder for instrument values produced by a batch callback. */
static class BatchRecorderImpl implements BatchRecorder {
private final MetricSink sink;
private final BitSet allowedInstruments;
BatchRecorderImpl(MetricSink sink, BitSet allowedInstruments) {
this.sink = checkNotNull(sink, "sink");
this.allowedInstruments = checkNotNull(allowedInstruments, "allowedInstruments");
}
@Override
public void recordLongGauge(LongGaugeMetricInstrument metricInstrument, long value,
List<String> requiredLabelValues, List<String> optionalLabelValues) {
checkArgument(allowedInstruments.get(metricInstrument.getIndex()),
"Instrument was not listed when registering callback: %s", metricInstrument);
checkArgument(requiredLabelValues != null
&& requiredLabelValues.size() == metricInstrument.getRequiredLabelKeys().size(),
"Incorrect number of required labels provided. Expected: %s",
metricInstrument.getRequiredLabelKeys().size());
checkArgument(optionalLabelValues != null
&& optionalLabelValues.size() == metricInstrument.getOptionalLabelKeys().size(),
"Incorrect number of optional labels provided. Expected: %s",
metricInstrument.getOptionalLabelKeys().size());
// Registering the callback checked that the instruments were be present in sink.
sink.recordLongGauge(metricInstrument, value, requiredLabelValues, optionalLabelValues);
}
}
}

View File

@ -16,6 +16,8 @@
package io.grpc.internal;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@ -28,6 +30,7 @@ import com.google.common.collect.ImmutableList;
import io.grpc.DoubleCounterMetricInstrument;
import io.grpc.DoubleHistogramMetricInstrument;
import io.grpc.LongCounterMetricInstrument;
import io.grpc.LongGaugeMetricInstrument;
import io.grpc.LongHistogramMetricInstrument;
import io.grpc.MetricInstrumentRegistry;
import io.grpc.MetricInstrumentRegistryAccessor;
@ -40,6 +43,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
/**
* Unit test for {@link MetricRecorderImpl}.
@ -72,6 +76,9 @@ public class MetricRecorderImplTest {
private final LongHistogramMetricInstrument longHistogramInstrument =
registry.registerLongHistogram("histogram2", DESCRIPTION, UNIT,
Collections.emptyList(), REQUIRED_LABEL_KEYS, OPTIONAL_LABEL_KEYS, ENABLED);
private final LongGaugeMetricInstrument longGaugeInstrument =
registry.registerLongGauge("gauge0", DESCRIPTION, UNIT, REQUIRED_LABEL_KEYS,
OPTIONAL_LABEL_KEYS, ENABLED);
private MetricRecorder recorder;
@Before
@ -113,6 +120,34 @@ public class MetricRecorderImplTest {
verify(mockSink, never()).updateMeasures(registry.getMetricInstruments());
}
@Test
public void recordCallback() {
MetricSink.Registration mockRegistration = mock(MetricSink.Registration.class);
when(mockSink.getMeasuresSize()).thenReturn(5);
when(mockSink.registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument)))
.thenReturn(mockRegistration);
MetricRecorder.Registration registration = recorder.registerBatchCallback((recorder) -> {
recorder.recordLongGauge(
longGaugeInstrument, 99, REQUIRED_LABEL_VALUES, OPTIONAL_LABEL_VALUES);
}, longGaugeInstrument);
ArgumentCaptor<Runnable> callbackCaptor = ArgumentCaptor.forClass(Runnable.class);
verify(mockSink, times(2))
.registerBatchCallback(callbackCaptor.capture(), eq(longGaugeInstrument));
callbackCaptor.getValue().run();
// Only once, for the one sink that called the callback.
verify(mockSink).recordLongGauge(
longGaugeInstrument, 99, REQUIRED_LABEL_VALUES, OPTIONAL_LABEL_VALUES);
verify(mockRegistration, never()).close();
registration.close();
verify(mockRegistration, times(2)).close();
verify(mockSink, never()).updateMeasures(registry.getMetricInstruments());
}
@Test
public void newRegisteredMetricUpdateMeasures() {
// Sink is initialized with zero measures, should trigger updateMeasures() on sinks
@ -145,6 +180,16 @@ public class MetricRecorderImplTest {
verify(mockSink, times(8)).updateMeasures(registry.getMetricInstruments());
verify(mockSink, times(2)).recordLongHistogram(eq(longHistogramInstrument), eq(99L),
eq(REQUIRED_LABEL_VALUES), eq(OPTIONAL_LABEL_VALUES));
// Callback
when(mockSink.registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument)))
.thenReturn(mock(MetricSink.Registration.class));
MetricRecorder.Registration registration = recorder.registerBatchCallback(
(recorder) -> { }, longGaugeInstrument);
verify(mockSink, times(10)).updateMeasures(registry.getMetricInstruments());
verify(mockSink, times(2))
.registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument));
registration.close();
}
@Test(expected = IllegalArgumentException.class)
@ -179,6 +224,26 @@ public class MetricRecorderImplTest {
OPTIONAL_LABEL_VALUES);
}
@Test
public void recordLongGaugeMismatchedRequiredLabelValues() {
when(mockSink.getMeasuresSize()).thenReturn(4);
when(mockSink.registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument)))
.thenReturn(mock(MetricSink.Registration.class));
MetricRecorder.Registration registration = recorder.registerBatchCallback((recorder) -> {
assertThrows(
IllegalArgumentException.class,
() -> recorder.recordLongGauge(
longGaugeInstrument, 99, ImmutableList.of(), OPTIONAL_LABEL_VALUES));
}, longGaugeInstrument);
ArgumentCaptor<Runnable> callbackCaptor = ArgumentCaptor.forClass(Runnable.class);
verify(mockSink, times(2))
.registerBatchCallback(callbackCaptor.capture(), eq(longGaugeInstrument));
callbackCaptor.getValue().run();
registration.close();
}
@Test(expected = IllegalArgumentException.class)
public void addDoubleCounterMismatchedOptionalLabelValues() {
when(mockSink.getMeasuresSize()).thenReturn(4);
@ -210,4 +275,24 @@ public class MetricRecorderImplTest {
recorder.recordLongHistogram(longHistogramInstrument, 99, REQUIRED_LABEL_VALUES,
ImmutableList.of());
}
@Test
public void recordLongGaugeMismatchedOptionalLabelValues() {
when(mockSink.getMeasuresSize()).thenReturn(4);
when(mockSink.registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument)))
.thenReturn(mock(MetricSink.Registration.class));
MetricRecorder.Registration registration = recorder.registerBatchCallback((recorder) -> {
assertThrows(
IllegalArgumentException.class,
() -> recorder.recordLongGauge(
longGaugeInstrument, 99, REQUIRED_LABEL_VALUES, ImmutableList.of()));
}, longGaugeInstrument);
ArgumentCaptor<Runnable> callbackCaptor = ArgumentCaptor.forClass(Runnable.class);
verify(mockSink, times(2))
.registerBatchCallback(callbackCaptor.capture(), eq(longGaugeInstrument));
callbackCaptor.getValue().run();
registration.close();
}
}

View File

@ -21,19 +21,24 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.grpc.CallbackMetricInstrument;
import io.grpc.DoubleCounterMetricInstrument;
import io.grpc.DoubleHistogramMetricInstrument;
import io.grpc.LongCounterMetricInstrument;
import io.grpc.LongGaugeMetricInstrument;
import io.grpc.LongHistogramMetricInstrument;
import io.grpc.MetricInstrument;
import io.grpc.MetricSink;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.DoubleCounter;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.api.metrics.ObservableMeasurement;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
@ -142,6 +147,48 @@ final class OpenTelemetryMetricSink implements MetricSink {
histogram.record(value, attributes);
}
@Override
public void recordLongGauge(LongGaugeMetricInstrument metricInstrument, long value,
List<String> requiredLabelValues, List<String> optionalLabelValues) {
MeasuresData instrumentData = measures.get(metricInstrument.getIndex());
if (instrumentData == null) {
// Disabled metric
return;
}
Attributes attributes = createAttributes(metricInstrument.getRequiredLabelKeys(),
metricInstrument.getOptionalLabelKeys(), requiredLabelValues, optionalLabelValues,
instrumentData.getOptionalLabelsBitSet());
ObservableLongMeasurement gauge = (ObservableLongMeasurement) instrumentData.getMeasure();
gauge.record(value, attributes);
}
@Override
public Registration registerBatchCallback(Runnable callback,
CallbackMetricInstrument... metricInstruments) {
List<ObservableMeasurement> measurements = new ArrayList<>(metricInstruments.length);
for (CallbackMetricInstrument metricInstrument: metricInstruments) {
MeasuresData instrumentData = measures.get(metricInstrument.getIndex());
if (instrumentData == null) {
// Disabled metric
continue;
}
if (!(instrumentData.getMeasure() instanceof ObservableMeasurement)) {
logger.log(Level.FINE, "Unsupported metric instrument type : {0} {1}",
new Object[] {metricInstrument, instrumentData.getMeasure().getClass()});
continue;
}
measurements.add((ObservableMeasurement) instrumentData.getMeasure());
}
if (measurements.isEmpty()) {
return () -> { };
}
ObservableMeasurement first = measurements.get(0);
measurements.remove(0);
BatchCallback closeable = openTelemetryMeter.batchCallback(
callback, first, measurements.toArray(new ObservableMeasurement[0]));
return closeable::close;
}
@Override
public void updateMeasures(List<MetricInstrument> instruments) {
synchronized (lock) {
@ -203,6 +250,12 @@ final class OpenTelemetryMetricSink implements MetricSink {
.setDescription(description)
.ofLongs()
.build();
} else if (instrument instanceof LongGaugeMetricInstrument) {
openTelemetryMeasure = openTelemetryMeter.gaugeBuilder(name)
.setUnit(unit)
.setDescription(description)
.ofLongs()
.buildObserver();
} else {
logger.log(Level.FINE, "Unsupported metric instrument type : {0}", instrument);
openTelemetryMeasure = null;
@ -241,7 +294,6 @@ final class OpenTelemetryMetricSink implements MetricSink {
return builder.build();
}
static final class MeasuresData {
final BitSet optionalLabelsIndices;
final Object measure;

View File

@ -22,8 +22,10 @@ import com.google.common.collect.ImmutableList;
import io.grpc.DoubleCounterMetricInstrument;
import io.grpc.DoubleHistogramMetricInstrument;
import io.grpc.LongCounterMetricInstrument;
import io.grpc.LongGaugeMetricInstrument;
import io.grpc.LongHistogramMetricInstrument;
import io.grpc.MetricInstrument;
import io.grpc.MetricSink;
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.metrics.DoubleCounter;
@ -298,6 +300,79 @@ public class OpenTelemetryMetricSinkTest {
assertThat(openTelemetryTesting.getMetrics()).isEmpty();
}
@Test
public void registerBatchCallback_allDisabled() {
// set up sink with disabled metric
Map<String, Boolean> enabledMetrics = new HashMap<>();
LongGaugeMetricInstrument longGaugeInstrumentDisabled =
new LongGaugeMetricInstrument(0, "disk", "Amount of disk used", "By",
Collections.emptyList(), Collections.emptyList(), false);
// Create sink
sink = new OpenTelemetryMetricSink(testMeter, enabledMetrics, false, Collections.emptyList());
// Invoke updateMeasures
sink.updateMeasures(Arrays.asList(longGaugeInstrumentDisabled));
MetricSink.Registration registration = sink.registerBatchCallback(() -> {
sink.recordLongGauge(
longGaugeInstrumentDisabled, 999, Collections.emptyList(), Collections.emptyList());
}, longGaugeInstrumentDisabled);
assertThat(openTelemetryTesting.getMetrics())
.satisfiesExactlyInAnyOrder();
registration.close();
}
@Test
public void registerBatchCallback_bothEnabledAndDisabled() {
// set up sink with disabled metric
Map<String, Boolean> enabledMetrics = new HashMap<>();
enabledMetrics.put("memory", true);
LongGaugeMetricInstrument longGaugeInstrumentEnabled =
new LongGaugeMetricInstrument(0, "memory", "Amount of memory used", "By",
Collections.emptyList(), Collections.emptyList(), false);
LongGaugeMetricInstrument longGaugeInstrumentDisabled =
new LongGaugeMetricInstrument(1, "disk", "Amount of disk used", "By",
Collections.emptyList(), Collections.emptyList(), false);
// Create sink
sink = new OpenTelemetryMetricSink(testMeter, enabledMetrics, false, Collections.emptyList());
// Invoke updateMeasures
sink.updateMeasures(Arrays.asList(longGaugeInstrumentEnabled, longGaugeInstrumentDisabled));
MetricSink.Registration registration = sink.registerBatchCallback(() -> {
sink.recordLongGauge(
longGaugeInstrumentEnabled, 99, Collections.emptyList(), Collections.emptyList());
sink.recordLongGauge(
longGaugeInstrumentDisabled, 999, Collections.emptyList(), Collections.emptyList());
}, longGaugeInstrumentEnabled, longGaugeInstrumentDisabled);
assertThat(openTelemetryTesting.getMetrics())
.satisfiesExactlyInAnyOrder(
metric ->
assertThat(metric)
.hasInstrumentationScope(InstrumentationScopeInfo.create(
OpenTelemetryConstants.INSTRUMENTATION_SCOPE))
.hasName("memory")
.hasDescription("Amount of memory used")
.hasUnit("By")
.hasLongGaugeSatisfying(
gauge ->
gauge.hasPointsSatisfying(
point ->
point
.hasValue(99))));
// Gauge goes away after close
registration.close();
assertThat(openTelemetryTesting.getMetrics())
.satisfiesExactlyInAnyOrder();
}
@Test
public void recordLabels() {
Map<String, Boolean> enabledMetrics = new HashMap<>();