opentelemetry: Add optional grpc.lb.locality to per-call metrics

The optional label API was added in 4c78a974 and xds_cluster_impl was
plumbed in 077dcbf9.

From gRFC A78:

> ### Optional xDS Locality Label
>
> When xDS is used, it is desirable for some metrics to include an optional
> label indicating which xDS locality the metrics are associated with.
> We want to provide this optional label for the metrics in both the
> existing per-call metrics defined in [A66] and in the new metrics for
> the WRR LB policy, described below.
>
> If locality information is available, the value of this label will be of
> the form `{region="${REGION}", zone="${ZONE}", sub_zone="${SUB_ZONE}"}`,
> where `${REGION}`, `${ZONE}`, and `${SUB_ZONE}` are replaced with the
> actual values.  If no locality information is available, the label will
> be set to the empty string.
>
> #### Per-Call Metrics
>
> To support the locality label in the per-call metrics, we will provide
> a mechanism for LB picker to add optional labels to the call attempt
> tracer.  We will then use this mechanism in the `xds_cluster_impl`
> policy's picker to set the locality label. ...
>
> This label will be available on the following per-call metrics:
> - `grpc.client.attempt.duration`
> - `grpc.client.attempt.sent_total_compressed_message_size`
> - `grpc.client.attempt.rcvd_total_compressed_message_size`
This commit is contained in:
Eric Anderson 2024-05-06 08:59:29 -07:00
parent b6f7b693e7
commit a639175c04
4 changed files with 183 additions and 19 deletions

View File

@ -17,6 +17,7 @@
package io.grpc.opentelemetry;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TARGET_KEY;
@ -40,6 +41,8 @@ import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StreamTracer;
import io.opentelemetry.api.common.AttributesBuilder;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
@ -63,6 +66,7 @@ import javax.annotation.concurrent.GuardedBy;
*/
final class OpenTelemetryMetricsModule {
private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
private static final String LOCALITY_LABEL_NAME = "grpc.lb.locality";
public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
ImmutableSet.of(
"grpc.client.attempt.started",
@ -81,11 +85,13 @@ final class OpenTelemetryMetricsModule {
private final OpenTelemetryMetricsResource resource;
private final Supplier<Stopwatch> stopwatchSupplier;
private final boolean localityEnabled;
OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
OpenTelemetryMetricsResource resource) {
OpenTelemetryMetricsResource resource, Collection<String> optionalLabels) {
this.resource = checkNotNull(resource, "resource");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
this.localityEnabled = optionalLabels.contains(LOCALITY_LABEL_NAME);
}
/**
@ -140,6 +146,7 @@ final class OpenTelemetryMetricsModule {
final String fullMethodName;
volatile long outboundWireSize;
volatile long inboundWireSize;
volatile String locality;
long attemptNanos;
Code statusCode;
@ -173,6 +180,13 @@ final class OpenTelemetryMetricsModule {
}
}
@Override
public void addOptionalLabel(String key, String value) {
if (LOCALITY_LABEL_NAME.equals(key)) {
locality = value;
}
}
@Override
public void streamClosed(Status status) {
stopwatch.stop();
@ -192,10 +206,18 @@ final class OpenTelemetryMetricsModule {
}
void recordFinishedAttempt() {
io.opentelemetry.api.common.Attributes attribute =
io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName,
TARGET_KEY, target,
STATUS_KEY, statusCode.toString());
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, fullMethodName)
.put(TARGET_KEY, target)
.put(STATUS_KEY, statusCode.toString());
if (module.localityEnabled) {
String savedLocality = locality;
if (savedLocality == null) {
savedLocality = "unknown";
}
builder.put(LOCALITY_KEY, savedLocality);
}
io.opentelemetry.api.common.Attributes attribute = builder.build();
if (module.resource.clientAttemptDurationCounter() != null ) {
module.resource.clientAttemptDurationCounter()
@ -315,7 +337,8 @@ final class OpenTelemetryMetricsModule {
void recordFinishedCall() {
if (attemptsPerCall.get() == 0) {
ClientTracer tracer = new ClientTracer(this, module, null, target, fullMethodName);
ClientTracer tracer =
new ClientTracer(this, module, null, target, fullMethodName);
tracer.attemptNanos = attemptStopwatch.elapsed(TimeUnit.NANOSECONDS);
tracer.statusCode = status.getCode();
tracer.recordFinishedAttempt();
@ -478,8 +501,8 @@ final class OpenTelemetryMetricsModule {
// which is true for all generated methods. Otherwise, programatically
// created methods result in high cardinality metrics.
final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory(
OpenTelemetryMetricsModule.this, target, recordMethodName(method.getFullMethodName(),
method.isSampledToLocalTracing()));
OpenTelemetryMetricsModule.this, target,
recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing()));
ClientCall<ReqT, RespT> call =
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {

View File

@ -84,8 +84,9 @@ public final class OpenTelemetryModule {
this.enableMetrics = ImmutableMap.copyOf(builder.enableMetrics);
this.disableDefault = builder.disableAll;
this.resource = createMetricInstruments(meter, enableMetrics, disableDefault);
this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule(STOPWATCH_SUPPLIER, resource);
this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels);
this.openTelemetryMetricsModule =
new OpenTelemetryMetricsModule(STOPWATCH_SUPPLIER, resource, optionalLabels);
this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels);
}

View File

@ -28,6 +28,9 @@ public final class OpenTelemetryConstants {
public static final AttributeKey<String> TARGET_KEY = AttributeKey.stringKey("grpc.target");
public static final AttributeKey<String> LOCALITY_KEY =
AttributeKey.stringKey("grpc.lb.locality");
private OpenTelemetryConstants() {
}
}

View File

@ -17,6 +17,7 @@
package io.grpc.opentelemetry;
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TARGET_KEY;
@ -52,6 +53,7 @@ import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -158,8 +160,7 @@ public class OpenTelemetryMetricsModuleTest {
public void testClientInterceptors() {
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module =
new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource);
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource);
grpcServerRule.getServiceRegistry().addService(
ServerServiceDefinition.builder("package1.service2").addMethod(
method, new ServerCallHandler<String, String>() {
@ -215,8 +216,7 @@ public class OpenTelemetryMetricsModuleTest {
String target = "target:///";
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module =
new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource);
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource);
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
new CallAttemptsTracerFactory(module, target, method.getFullMethodName());
Metadata headers = new Metadata();
@ -243,6 +243,8 @@ public class OpenTelemetryMetricsModuleTest {
.hasAttributes(attributes)
.hasValue(1))));
tracer.addOptionalLabel("grpc.lb.locality", "should-be-ignored");
fakeClock.forwardTime(30, TimeUnit.MILLISECONDS);
tracer.outboundHeaders();
@ -353,8 +355,7 @@ public class OpenTelemetryMetricsModuleTest {
String target = "dns:///example.com";
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module =
new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource);
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource);
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target,
method.getFullMethodName());
@ -779,8 +780,7 @@ public class OpenTelemetryMetricsModuleTest {
String target = "dns:///foo.example.com";
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module =
new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource);
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource);
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target,
method.getFullMethodName());
@ -880,11 +880,142 @@ public class OpenTelemetryMetricsModuleTest {
}
@Test
public void serverBasicMetrics() {
public void clientLocalityMetrics_present() {
String target = "target:///";
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
fakeClock.getStopwatchSupplier(), resource);
fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"));
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
new CallAttemptsTracerFactory(module, target, method.getFullMethodName());
ClientStreamTracer tracer =
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
tracer.addOptionalLabel("grpc.lb.foo", "unimportant");
tracer.addOptionalLabel("grpc.lb.locality", "should-be-overwritten");
tracer.addOptionalLabel("grpc.lb.locality", "the-moon");
tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon");
tracer.streamClosed(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK);
io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName());
io.opentelemetry.api.common.Attributes clientAttributes
= io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName(),
STATUS_KEY, Status.Code.OK.toString());
io.opentelemetry.api.common.Attributes clientAttributesWithLocality
= clientAttributes.toBuilder()
.put(LOCALITY_KEY, "the-moon")
.build();
assertThat(openTelemetryTesting.getMetrics())
.satisfiesExactlyInAnyOrder(
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
.hasLongSumSatisfying(
longSum -> longSum.hasPointsSatisfying(
point -> point.hasAttributes(attributes))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithLocality))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithLocality))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithLocality))),
metric ->
assertThat(metric)
.hasName(CLIENT_CALL_DURATION)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributes))));
}
@Test
public void clientLocalityMetrics_missing() {
String target = "target:///";
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"));
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
new CallAttemptsTracerFactory(module, target, method.getFullMethodName());
ClientStreamTracer tracer =
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
tracer.streamClosed(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK);
io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName());
io.opentelemetry.api.common.Attributes clientAttributes
= io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName(),
STATUS_KEY, Status.Code.OK.toString());
io.opentelemetry.api.common.Attributes clientAttributesWithLocality
= clientAttributes.toBuilder()
.put(LOCALITY_KEY, "unknown")
.build();
assertThat(openTelemetryTesting.getMetrics())
.satisfiesExactlyInAnyOrder(
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
.hasLongSumSatisfying(
longSum -> longSum.hasPointsSatisfying(
point -> point.hasAttributes(attributes))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithLocality))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithLocality))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithLocality))),
metric ->
assertThat(metric)
.hasName(CLIENT_CALL_DURATION)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributes))));
}
@Test
public void serverBasicMetrics() {
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource);
ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory();
ServerStreamTracer tracer =
tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata());
@ -994,6 +1125,12 @@ public class OpenTelemetryMetricsModuleTest {
}
private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule(
OpenTelemetryMetricsResource resource) {
return new OpenTelemetryMetricsModule(
fakeClock.getStopwatchSupplier(), resource, Arrays.asList());
}
static class CallInfo<ReqT, RespT> extends ServerCallInfo<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> methodDescriptor;
private final Attributes attributes;