From a639175c0468877d198d7face0a2916b33cf5fa5 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Mon, 6 May 2024 08:59:29 -0700 Subject: [PATCH] opentelemetry: Add optional grpc.lb.locality to per-call metrics The optional label API was added in 4c78a974 and xds_cluster_impl was plumbed in 077dcbf9. From gRFC A78: > ### Optional xDS Locality Label > > When xDS is used, it is desirable for some metrics to include an optional > label indicating which xDS locality the metrics are associated with. > We want to provide this optional label for the metrics in both the > existing per-call metrics defined in [A66] and in the new metrics for > the WRR LB policy, described below. > > If locality information is available, the value of this label will be of > the form `{region="${REGION}", zone="${ZONE}", sub_zone="${SUB_ZONE}"}`, > where `${REGION}`, `${ZONE}`, and `${SUB_ZONE}` are replaced with the > actual values. If no locality information is available, the label will > be set to the empty string. > > #### Per-Call Metrics > > To support the locality label in the per-call metrics, we will provide > a mechanism for LB picker to add optional labels to the call attempt > tracer. We will then use this mechanism in the `xds_cluster_impl` > policy's picker to set the locality label. ... > > This label will be available on the following per-call metrics: > - `grpc.client.attempt.duration` > - `grpc.client.attempt.sent_total_compressed_message_size` > - `grpc.client.attempt.rcvd_total_compressed_message_size` --- .../OpenTelemetryMetricsModule.java | 39 ++++- .../opentelemetry/OpenTelemetryModule.java | 3 +- .../internal/OpenTelemetryConstants.java | 3 + .../OpenTelemetryMetricsModuleTest.java | 157 ++++++++++++++++-- 4 files changed, 183 insertions(+), 19 deletions(-) diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index 9c0d228594..71562c11a4 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -17,6 +17,7 @@ package io.grpc.opentelemetry; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TARGET_KEY; @@ -40,6 +41,8 @@ import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StreamTracer; +import io.opentelemetry.api.common.AttributesBuilder; +import java.util.Collection; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; @@ -63,6 +66,7 @@ import javax.annotation.concurrent.GuardedBy; */ final class OpenTelemetryMetricsModule { private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName()); + private static final String LOCALITY_LABEL_NAME = "grpc.lb.locality"; public static final ImmutableSet DEFAULT_PER_CALL_METRICS_SET = ImmutableSet.of( "grpc.client.attempt.started", @@ -81,11 +85,13 @@ final class OpenTelemetryMetricsModule { private final OpenTelemetryMetricsResource resource; private final Supplier stopwatchSupplier; + private final boolean localityEnabled; OpenTelemetryMetricsModule(Supplier stopwatchSupplier, - OpenTelemetryMetricsResource resource) { + OpenTelemetryMetricsResource resource, Collection optionalLabels) { this.resource = checkNotNull(resource, "resource"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); + this.localityEnabled = optionalLabels.contains(LOCALITY_LABEL_NAME); } /** @@ -140,6 +146,7 @@ final class OpenTelemetryMetricsModule { final String fullMethodName; volatile long outboundWireSize; volatile long inboundWireSize; + volatile String locality; long attemptNanos; Code statusCode; @@ -173,6 +180,13 @@ final class OpenTelemetryMetricsModule { } } + @Override + public void addOptionalLabel(String key, String value) { + if (LOCALITY_LABEL_NAME.equals(key)) { + locality = value; + } + } + @Override public void streamClosed(Status status) { stopwatch.stop(); @@ -192,10 +206,18 @@ final class OpenTelemetryMetricsModule { } void recordFinishedAttempt() { - io.opentelemetry.api.common.Attributes attribute = - io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName, - TARGET_KEY, target, - STATUS_KEY, statusCode.toString()); + AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() + .put(METHOD_KEY, fullMethodName) + .put(TARGET_KEY, target) + .put(STATUS_KEY, statusCode.toString()); + if (module.localityEnabled) { + String savedLocality = locality; + if (savedLocality == null) { + savedLocality = "unknown"; + } + builder.put(LOCALITY_KEY, savedLocality); + } + io.opentelemetry.api.common.Attributes attribute = builder.build(); if (module.resource.clientAttemptDurationCounter() != null ) { module.resource.clientAttemptDurationCounter() @@ -315,7 +337,8 @@ final class OpenTelemetryMetricsModule { void recordFinishedCall() { if (attemptsPerCall.get() == 0) { - ClientTracer tracer = new ClientTracer(this, module, null, target, fullMethodName); + ClientTracer tracer = + new ClientTracer(this, module, null, target, fullMethodName); tracer.attemptNanos = attemptStopwatch.elapsed(TimeUnit.NANOSECONDS); tracer.statusCode = status.getCode(); tracer.recordFinishedAttempt(); @@ -478,8 +501,8 @@ final class OpenTelemetryMetricsModule { // which is true for all generated methods. Otherwise, programatically // created methods result in high cardinality metrics. final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory( - OpenTelemetryMetricsModule.this, target, recordMethodName(method.getFullMethodName(), - method.isSampledToLocalTracing())); + OpenTelemetryMetricsModule.this, target, + recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing())); ClientCall call = next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory)); return new SimpleForwardingClientCall(call) { diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryModule.java index 3bb266699d..2623a83169 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryModule.java @@ -84,8 +84,9 @@ public final class OpenTelemetryModule { this.enableMetrics = ImmutableMap.copyOf(builder.enableMetrics); this.disableDefault = builder.disableAll; this.resource = createMetricInstruments(meter, enableMetrics, disableDefault); - this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule(STOPWATCH_SUPPLIER, resource); this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels); + this.openTelemetryMetricsModule = + new OpenTelemetryMetricsModule(STOPWATCH_SUPPLIER, resource, optionalLabels); this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels); } diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java index af84caa8b4..decb9c8d89 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java @@ -28,6 +28,9 @@ public final class OpenTelemetryConstants { public static final AttributeKey TARGET_KEY = AttributeKey.stringKey("grpc.target"); + public static final AttributeKey LOCALITY_KEY = + AttributeKey.stringKey("grpc.lb.locality"); + private OpenTelemetryConstants() { } } diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 8304c13d58..91b8f43e7a 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -17,6 +17,7 @@ package io.grpc.opentelemetry; import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED; +import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TARGET_KEY; @@ -52,6 +53,7 @@ import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; import java.io.InputStream; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -158,8 +160,7 @@ public class OpenTelemetryMetricsModuleTest { public void testClientInterceptors() { OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); - OpenTelemetryMetricsModule module = - new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource); + OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); grpcServerRule.getServiceRegistry().addService( ServerServiceDefinition.builder("package1.service2").addMethod( method, new ServerCallHandler() { @@ -215,8 +216,7 @@ public class OpenTelemetryMetricsModuleTest { String target = "target:///"; OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); - OpenTelemetryMetricsModule module = - new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource); + OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName()); Metadata headers = new Metadata(); @@ -243,6 +243,8 @@ public class OpenTelemetryMetricsModuleTest { .hasAttributes(attributes) .hasValue(1)))); + tracer.addOptionalLabel("grpc.lb.locality", "should-be-ignored"); + fakeClock.forwardTime(30, TimeUnit.MILLISECONDS); tracer.outboundHeaders(); @@ -353,8 +355,7 @@ public class OpenTelemetryMetricsModuleTest { String target = "dns:///example.com"; OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); - OpenTelemetryMetricsModule module = - new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource); + OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target, method.getFullMethodName()); @@ -779,8 +780,7 @@ public class OpenTelemetryMetricsModuleTest { String target = "dns:///foo.example.com"; OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); - OpenTelemetryMetricsModule module = - new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource); + OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target, method.getFullMethodName()); @@ -880,11 +880,142 @@ public class OpenTelemetryMetricsModuleTest { } @Test - public void serverBasicMetrics() { + public void clientLocalityMetrics_present() { + String target = "target:///"; OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource); + fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality")); + OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = + new CallAttemptsTracerFactory(module, target, method.getFullMethodName()); + + ClientStreamTracer tracer = + callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); + tracer.addOptionalLabel("grpc.lb.foo", "unimportant"); + tracer.addOptionalLabel("grpc.lb.locality", "should-be-overwritten"); + tracer.addOptionalLabel("grpc.lb.locality", "the-moon"); + tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon"); + tracer.streamClosed(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK); + + io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of( + TARGET_KEY, target, + METHOD_KEY, method.getFullMethodName()); + + io.opentelemetry.api.common.Attributes clientAttributes + = io.opentelemetry.api.common.Attributes.of( + TARGET_KEY, target, + METHOD_KEY, method.getFullMethodName(), + STATUS_KEY, Status.Code.OK.toString()); + + io.opentelemetry.api.common.Attributes clientAttributesWithLocality + = clientAttributes.toBuilder() + .put(LOCALITY_KEY, "the-moon") + .build(); + + assertThat(openTelemetryTesting.getMetrics()) + .satisfiesExactlyInAnyOrder( + metric -> + assertThat(metric) + .hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME) + .hasLongSumSatisfying( + longSum -> longSum.hasPointsSatisfying( + point -> point.hasAttributes(attributes))), + metric -> + assertThat(metric) + .hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(clientAttributesWithLocality))), + metric -> + assertThat(metric) + .hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(clientAttributesWithLocality))), + metric -> + assertThat(metric) + .hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(clientAttributesWithLocality))), + metric -> + assertThat(metric) + .hasName(CLIENT_CALL_DURATION) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(clientAttributes)))); + } + + @Test + public void clientLocalityMetrics_missing() { + String target = "target:///"; + OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter, + enabledMetricsMap, disableDefaultMetrics); + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality")); + OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = + new CallAttemptsTracerFactory(module, target, method.getFullMethodName()); + + ClientStreamTracer tracer = + callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); + tracer.streamClosed(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK); + + io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of( + TARGET_KEY, target, + METHOD_KEY, method.getFullMethodName()); + + io.opentelemetry.api.common.Attributes clientAttributes + = io.opentelemetry.api.common.Attributes.of( + TARGET_KEY, target, + METHOD_KEY, method.getFullMethodName(), + STATUS_KEY, Status.Code.OK.toString()); + + io.opentelemetry.api.common.Attributes clientAttributesWithLocality + = clientAttributes.toBuilder() + .put(LOCALITY_KEY, "unknown") + .build(); + + assertThat(openTelemetryTesting.getMetrics()) + .satisfiesExactlyInAnyOrder( + metric -> + assertThat(metric) + .hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME) + .hasLongSumSatisfying( + longSum -> longSum.hasPointsSatisfying( + point -> point.hasAttributes(attributes))), + metric -> + assertThat(metric) + .hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(clientAttributesWithLocality))), + metric -> + assertThat(metric) + .hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(clientAttributesWithLocality))), + metric -> + assertThat(metric) + .hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(clientAttributesWithLocality))), + metric -> + assertThat(metric) + .hasName(CLIENT_CALL_DURATION) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(clientAttributes)))); + } + + @Test + public void serverBasicMetrics() { + OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter, + enabledMetricsMap, disableDefaultMetrics); + OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory(); ServerStreamTracer tracer = tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); @@ -994,6 +1125,12 @@ public class OpenTelemetryMetricsModuleTest { } + private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule( + OpenTelemetryMetricsResource resource) { + return new OpenTelemetryMetricsModule( + fakeClock.getStopwatchSupplier(), resource, Arrays.asList()); + } + static class CallInfo extends ServerCallInfo { private final MethodDescriptor methodDescriptor; private final Attributes attributes;