gcp-observability, census: add trace information to logs (#9963)

This commit adds trace information (TraceId, SpanId and TraceSampled)
fields to LogEntry, when both logging and tracing are enabled in
gcp-observability. 

For server-side logs, span information was readily available using
Span.getContext() propagated via `io.grpc.Context`. Similar approach is
not feasible for client-side architecture.

Client SpanContext which has all the information required to be added
to logs is propagated to the logging interceptor via `io.grpc.CallOptions`.
This commit is contained in:
DNVindhya 2023-03-20 14:18:16 -07:00 committed by GitHub
parent efce51be0b
commit 844de39c26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 686 additions and 125 deletions

View File

@ -17,6 +17,7 @@
package io.grpc.census;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.census.internal.ObservabilityCensusConstants.CLIENT_TRACE_SPAN_CONTEXT_KEY;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
@ -124,8 +125,8 @@ final class CensusTracingModule {
*/
@VisibleForTesting
CallAttemptsTracerFactory newClientCallTracer(
@Nullable Span parentSpan, MethodDescriptor<?, ?> method) {
return new CallAttemptsTracerFactory(parentSpan, method);
@Nullable Span clientSpan, MethodDescriptor<?, ?> method) {
return new CallAttemptsTracerFactory(clientSpan, method);
}
/**
@ -248,17 +249,11 @@ final class CensusTracingModule {
private final Span span;
private final String fullMethodName;
CallAttemptsTracerFactory(@Nullable Span parentSpan, MethodDescriptor<?, ?> method) {
CallAttemptsTracerFactory(@Nullable Span clientSpan, MethodDescriptor<?, ?> method) {
checkNotNull(method, "method");
this.isSampledToLocalTracing = method.isSampledToLocalTracing();
this.fullMethodName = method.getFullMethodName();
this.span =
censusTracer
.spanBuilderWithExplicitParent(
generateTraceSpanName(false, fullMethodName),
parentSpan)
.setRecordEvents(true)
.startSpan();
this.span = clientSpan;
}
@Override
@ -461,13 +456,20 @@ final class CensusTracingModule {
// Safe usage of the unsafe trace API because CONTEXT_SPAN_KEY.get() returns the same value
// as Tracer.getCurrentSpan() except when no value available when the return value is null
// for the direct access and BlankSpan when Tracer API is used.
final CallAttemptsTracerFactory tracerFactory =
newClientCallTracer(
io.opencensus.trace.unsafe.ContextUtils.getValue(Context.current()), method);
Span parentSpan = io.opencensus.trace.unsafe.ContextUtils.getValue(Context.current());
Span clientSpan = censusTracer
.spanBuilderWithExplicitParent(
generateTraceSpanName(false, method.getFullMethodName()),
parentSpan)
.setRecordEvents(true)
.startSpan();
final CallAttemptsTracerFactory tracerFactory = newClientCallTracer(clientSpan, method);
ClientCall<ReqT, RespT> call =
next.newCall(
method,
callOptions.withStreamTracerFactory(tracerFactory));
callOptions.withStreamTracerFactory(tracerFactory)
.withOption(CLIENT_TRACE_SPAN_CONTEXT_KEY, clientSpan.getContext()));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {

View File

@ -26,11 +26,13 @@ import static io.opencensus.contrib.grpc.metrics.RpcMeasureConstants.GRPC_SERVER
import static io.opencensus.contrib.grpc.metrics.RpcMeasureConstants.GRPC_SERVER_STATUS;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.CallOptions;
import io.opencensus.contrib.grpc.metrics.RpcViewConstants;
import io.opencensus.stats.Aggregation;
import io.opencensus.stats.Measure;
import io.opencensus.stats.Measure.MeasureDouble;
import io.opencensus.stats.View;
import io.opencensus.trace.SpanContext;
import java.util.Arrays;
// TODO(dnvindhya): Remove metric and view definitions from this class once it is moved to
@ -42,6 +44,9 @@ import java.util.Arrays;
@VisibleForTesting
public final class ObservabilityCensusConstants {
public static CallOptions.Key<SpanContext> CLIENT_TRACE_SPAN_CONTEXT_KEY
= CallOptions.Key.createWithDefault("Client span context for tracing", SpanContext.INVALID);
static final Aggregation AGGREGATION_WITH_BYTES_HISTOGRAM =
RpcViewConstants.GRPC_CLIENT_SENT_BYTES_PER_RPC_VIEW.getAggregation();

View File

@ -22,6 +22,7 @@ import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRIES
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRY_DELAY_PER_CALL;
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.TRANSPARENT_RETRIES_PER_CALL;
import static io.grpc.census.internal.ObservabilityCensusConstants.API_LATENCY_PER_CALL;
import static io.grpc.census.internal.ObservabilityCensusConstants.CLIENT_TRACE_SPAN_CONTEXT_KEY;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -317,6 +318,10 @@ public class CensusModulesTest {
capturedCallOptions.get().getStreamTracerFactories().get(1)
instanceof CensusStatsModule.CallAttemptsTracerFactory);
// The interceptor adds client SpanContext to CallOptions
assertTrue(capturedCallOptions.get().getOption(CLIENT_TRACE_SPAN_CONTEXT_KEY).isValid());
assertTrue(capturedCallOptions.get().getOption(CLIENT_TRACE_SPAN_CONTEXT_KEY) != null);
// Make the call
Metadata headers = new Metadata();
call.start(mockClientCallListener, headers);
@ -738,12 +743,10 @@ public class CensusModulesTest {
@Test
public void clientBasicTracingDefaultSpan() {
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(null, method);
censusTracing.newClientCallTracer(spyClientSpan, method);
Metadata headers = new Metadata();
ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
clientStreamTracer.streamCreated(Attributes.EMPTY, headers);
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), ArgumentMatchers.<Span>isNull());
verify(tracer).spanBuilderWithExplicitParent(
eq("Attempt.package1.service2.method3"), eq(spyClientSpan));
verify(spyClientSpan, never()).end(any(EndSpanOptions.class));
@ -797,7 +800,7 @@ public class CensusModulesTest {
@Test
public void clientTracingSampledToLocalSpanStore() {
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(null, sampledMethod);
censusTracing.newClientCallTracer(spyClientSpan, sampledMethod);
callTracer.callEnded(Status.OK);
verify(spyClientSpan).end(
@ -867,10 +870,7 @@ public class CensusModulesTest {
@Test
public void clientStreamNeverCreatedStillRecordTracing() {
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(fakeClientParentSpan, method);
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), same(fakeClientParentSpan));
verify(spyClientSpanBuilder).setRecordEvents(eq(true));
censusTracing.newClientCallTracer(spyClientSpan, method);
callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds"));
verify(spyClientSpan).end(
@ -1046,18 +1046,15 @@ public class CensusModulesTest {
@Test
public void traceHeadersPropagateSpanContext() throws Exception {
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(fakeClientParentSpan, method);
censusTracing.newClientCallTracer(spyClientSpan, method);
Metadata headers = new Metadata();
ClientStreamTracer streamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
streamTracer.streamCreated(Attributes.EMPTY, headers);
verify(mockTracingPropagationHandler).toByteArray(same(fakeAttemptSpanContext));
verifyNoMoreInteractions(mockTracingPropagationHandler);
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), same(fakeClientParentSpan));
verify(tracer).spanBuilderWithExplicitParent(
eq("Attempt.package1.service2.method3"), same(spyClientSpan));
verify(spyClientSpanBuilder).setRecordEvents(eq(true));
verifyNoMoreInteractions(tracer);
assertTrue(headers.containsKey(censusTracing.tracingHeader));

View File

@ -166,7 +166,7 @@ public class CensusTracingAnnotationEventTest {
@Test
public void clientBasicTracingUncompressedSizeAnnotation() {
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(null, method);
censusTracing.newClientCallTracer(spyClientSpan, method);
Metadata headers = new Metadata();
ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
clientStreamTracer.streamCreated(Attributes.EMPTY, headers);

View File

@ -36,6 +36,7 @@ import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.gcp.observability.interceptors.LogHelper;
import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink;
import io.grpc.gcp.observability.logging.TraceLoggingHelper;
import io.opencensus.common.Duration;
import io.opencensus.contrib.grpc.metrics.RpcViewConstants;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration;
@ -58,7 +59,8 @@ import java.util.stream.Collectors;
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869")
public final class GcpObservability implements AutoCloseable {
private static final int METRICS_EXPORT_INTERVAL = 30;
private static final ImmutableSet<String> SERVICES_TO_EXCLUDE = ImmutableSet.of(
@VisibleForTesting
static final ImmutableSet<String> SERVICES_TO_EXCLUDE = ImmutableSet.of(
"google.logging.v2.LoggingServiceV2", "google.monitoring.v3.MetricService",
"google.devtools.cloudtrace.v2.TraceService");
private static GcpObservability instance = null;
@ -77,9 +79,11 @@ public final class GcpObservability implements AutoCloseable {
if (instance == null) {
GlobalLocationTags globalLocationTags = new GlobalLocationTags();
ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance();
TraceLoggingHelper traceLoggingHelper = new TraceLoggingHelper(
observabilityConfig.getProjectId());
Sink sink = new GcpLogSink(observabilityConfig.getProjectId(),
globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(),
SERVICES_TO_EXCLUDE);
globalLocationTags.getLocationTags(), observabilityConfig,
SERVICES_TO_EXCLUDE, traceLoggingHelper);
LogHelper helper = new LogHelper(sink);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.getInstance(observabilityConfig);
instance = grpcInit(sink, observabilityConfig,

View File

@ -16,6 +16,8 @@
package io.grpc.gcp.observability.interceptors;
import static io.grpc.census.internal.ObservabilityCensusConstants.CLIENT_TRACE_SPAN_CONTEXT_KEY;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import io.grpc.CallOptions;
@ -33,6 +35,7 @@ import io.grpc.Status;
import io.grpc.gcp.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.opencensus.trace.SpanContext;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -92,6 +95,7 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
// Get the stricter deadline to calculate the timeout once the call starts
final Deadline deadline = LogHelper.min(callOptions.getDeadline(),
Context.current().getDeadline());
final SpanContext clientSpanContext = callOptions.getOption(CLIENT_TRACE_SPAN_CONTEXT_KEY);
FilterParams filterParams = filterHelper.logRpcMethod(method.getFullMethodName(), true);
if (!filterParams.log()) {
@ -122,7 +126,8 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
maxHeaderBytes,
EventLogger.CLIENT,
callId,
null);
null,
clientSpanContext);
} catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events.
// This way we can catch both expected and unexpected exceptions instead of re-throwing
@ -148,7 +153,8 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
message,
maxMessageBytes,
EventLogger.CLIENT,
callId);
callId,
clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
@ -168,7 +174,8 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
maxHeaderBytes,
EventLogger.CLIENT,
callId,
LogHelper.getPeerAddress(getAttributes()));
LogHelper.getPeerAddress(getAttributes()),
clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
}
@ -189,7 +196,8 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
maxHeaderBytes,
EventLogger.CLIENT,
callId,
LogHelper.getPeerAddress(getAttributes()));
LogHelper.getPeerAddress(getAttributes()),
clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
}
@ -212,7 +220,8 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
message,
maxMessageBytes,
EventLogger.CLIENT,
callId);
callId,
clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
@ -229,7 +238,8 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
methodName,
authority,
EventLogger.CLIENT,
callId);
callId,
clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
@ -246,7 +256,8 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
methodName,
authority,
EventLogger.CLIENT,
callId);
callId,
clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}

View File

@ -31,6 +31,9 @@ import io.grpc.Status;
import io.grpc.gcp.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.opencensus.trace.Span;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.unsafe.ContextHandleUtils;
import java.net.SocketAddress;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -91,6 +94,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
Deadline deadline = Context.current().getDeadline();
final Duration timeout = deadline == null ? null
: Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS));
Span span = ContextHandleUtils.getValue(ContextHandleUtils.currentContext());
final SpanContext serverSpanContext = span == null ? SpanContext.INVALID : span.getContext();
FilterParams filterParams =
filterHelper.logRpcMethod(call.getMethodDescriptor().getFullMethodName(), false);
@ -113,7 +118,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
maxHeaderBytes,
EventLogger.SERVER,
callId,
peerAddress);
peerAddress,
serverSpanContext);
} catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events.
// This way we can catch both expected and unexpected exceptions instead of re-throwing
@ -139,7 +145,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
maxHeaderBytes,
EventLogger.SERVER,
callId,
null);
null,
serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
}
@ -160,7 +167,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
message,
maxMessageBytes,
EventLogger.SERVER,
callId);
callId,
serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
@ -181,7 +189,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
maxHeaderBytes,
EventLogger.SERVER,
callId,
null);
null,
serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
}
@ -206,7 +215,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
message,
maxMessageBytes,
EventLogger.SERVER,
callId);
callId,
serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
@ -223,7 +233,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
methodName,
authority,
EventLogger.SERVER,
callId);
callId,
serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
@ -240,7 +251,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
methodName,
authority,
EventLogger.SERVER,
callId);
callId,
serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}

View File

@ -36,6 +36,7 @@ import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.observabilitylog.v1.Payload;
import io.opencensus.trace.SpanContext;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
@ -88,7 +89,8 @@ public class LogHelper {
GrpcLogRecord.EventLogger eventLogger,
String callId,
// null on client side
@Nullable SocketAddress peerAddress) {
@Nullable SocketAddress peerAddress,
SpanContext spanContext) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(authority, "authority");
@ -114,7 +116,7 @@ public class LogHelper {
if (peerAddress != null) {
logEntryBuilder.setPeer(socketAddressToProto(peerAddress));
}
sink.write(logEntryBuilder.build());
sink.write(logEntryBuilder.build(), spanContext);
}
/**
@ -129,7 +131,8 @@ public class LogHelper {
int maxHeaderBytes,
GrpcLogRecord.EventLogger eventLogger,
String callId,
@Nullable SocketAddress peerAddress) {
@Nullable SocketAddress peerAddress,
SpanContext spanContext) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(authority, "authority");
@ -155,7 +158,7 @@ public class LogHelper {
if (peerAddress != null) {
logEntryBuilder.setPeer(socketAddressToProto(peerAddress));
}
sink.write(logEntryBuilder.build());
sink.write(logEntryBuilder.build(), spanContext);
}
/**
@ -171,7 +174,8 @@ public class LogHelper {
int maxHeaderBytes,
GrpcLogRecord.EventLogger eventLogger,
String callId,
@Nullable SocketAddress peerAddress) {
@Nullable SocketAddress peerAddress,
SpanContext spanContext) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(authority, "authority");
@ -205,7 +209,7 @@ public class LogHelper {
if (peerAddress != null) {
logEntryBuilder.setPeer(socketAddressToProto(peerAddress));
}
sink.write(logEntryBuilder.build());
sink.write(logEntryBuilder.build(), spanContext);
}
/**
@ -220,7 +224,8 @@ public class LogHelper {
T message,
int maxMessageBytes,
EventLogger eventLogger,
String callId) {
String callId,
SpanContext spanContext) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(authority, "authority");
@ -260,7 +265,7 @@ public class LogHelper {
logEntryBuilder.setPayload(pair.payloadBuilder)
.setPayloadTruncated(pair.truncated);
}
sink.write(logEntryBuilder.build());
sink.write(logEntryBuilder.build(), spanContext);
}
/**
@ -272,7 +277,8 @@ public class LogHelper {
String methodName,
String authority,
GrpcLogRecord.EventLogger eventLogger,
String callId) {
String callId,
SpanContext spanContext) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(authority, "authority");
@ -286,7 +292,7 @@ public class LogHelper {
.setType(EventType.CLIENT_HALF_CLOSE)
.setLogger(eventLogger)
.setCallId(callId);
sink.write(logEntryBuilder.build());
sink.write(logEntryBuilder.build(), spanContext);
}
/**
@ -298,7 +304,8 @@ public class LogHelper {
String methodName,
String authority,
GrpcLogRecord.EventLogger eventLogger,
String callId) {
String callId,
SpanContext spanContext) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(authority, "authority");
@ -312,7 +319,7 @@ public class LogHelper {
.setType(EventType.CANCEL)
.setLogger(eventLogger)
.setCallId(callId);
sink.write(logEntryBuilder.build());
sink.write(logEntryBuilder.build(), spanContext);
}
// TODO(DNVindhya): Evaluate if we need following clause for metadata logging in GcpObservability

View File

@ -33,8 +33,10 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.util.JsonFormat;
import io.grpc.Internal;
import io.grpc.gcp.observability.ObservabilityConfig;
import io.grpc.internal.JsonParser;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.opencensus.trace.SpanContext;
import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
@ -67,11 +69,16 @@ public class GcpLogSink implements Sink {
* logging APIs also uses gRPC. */
private volatile Logging gcpLoggingClient;
private final Collection<String> servicesToExclude;
private final boolean isTraceEnabled;
private final TraceLoggingHelper traceLoggingHelper;
@VisibleForTesting
GcpLogSink(Logging loggingClient, String projectId, Map<String, String> locationTags,
Map<String, String> customTags, Collection<String> servicesToExclude) {
this(projectId, locationTags, customTags, servicesToExclude);
ObservabilityConfig config, Collection<String> servicesToExclude,
TraceLoggingHelper traceLoggingHelper) {
this(projectId, locationTags, config, servicesToExclude, traceLoggingHelper);
this.gcpLoggingClient = loggingClient;
}
@ -82,11 +89,14 @@ public class GcpLogSink implements Sink {
* @param servicesToExclude service names for which log entries should not be generated
*/
public GcpLogSink(String projectId, Map<String, String> locationTags,
Map<String, String> customTags, Collection<String> servicesToExclude) {
ObservabilityConfig config, Collection<String> servicesToExclude,
TraceLoggingHelper traceLoggingHelper) {
this.projectId = projectId;
this.customTags = getCustomTags(customTags, locationTags, projectId);
this.customTags = getCustomTags(config.getCustomTags(), locationTags, projectId);
this.kubernetesResource = getResource(locationTags);
this.servicesToExclude = checkNotNull(servicesToExclude, "servicesToExclude");
this.isTraceEnabled = config.isEnableCloudTracing();
this.traceLoggingHelper = traceLoggingHelper;
}
/**
@ -95,7 +105,7 @@ public class GcpLogSink implements Sink {
* @param logProto gRPC logging proto containing the message to be logged
*/
@Override
public void write(GrpcLogRecord logProto) {
public void write(GrpcLogRecord logProto, SpanContext spanContext) {
if (gcpLoggingClient == null) {
synchronized (this) {
if (gcpLoggingClient == null) {
@ -122,7 +132,10 @@ public class GcpLogSink implements Sink {
if (!customTags.isEmpty()) {
grpcLogEntryBuilder.setLabels(customTags);
}
addTraceData(grpcLogEntryBuilder, spanContext);
grpcLogEntry = grpcLogEntryBuilder.build();
synchronized (this) {
logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", eventType);
gcpLoggingClient.write(Collections.singleton(grpcLogEntry));
@ -139,6 +152,13 @@ public class GcpLogSink implements Sink {
}
}
void addTraceData(LogEntry.Builder builder, SpanContext spanContext) {
if (!isTraceEnabled) {
return;
}
traceLoggingHelper.enhanceLogEntry(builder, spanContext);
}
Logging createLoggingClient() {
LoggingOptions.Builder builder = LoggingOptions.newBuilder();
if (!Strings.isNullOrEmpty(projectId)) {

View File

@ -18,6 +18,7 @@ package io.grpc.gcp.observability.logging;
import io.grpc.Internal;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.opencensus.trace.SpanContext;
/**
* Sink for GCP observability.
@ -27,7 +28,7 @@ public interface Sink {
/**
* Writes the {@code message} to the destination.
*/
void write(GrpcLogRecord message);
void write(GrpcLogRecord message, SpanContext spanContext);
/**
* Closes the sink.

View File

@ -0,0 +1,49 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.gcp.observability.logging;
import com.google.cloud.logging.LogEntry;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Internal;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.TraceId;
@Internal
public class TraceLoggingHelper {
private final String tracePrefix;
public TraceLoggingHelper(String projectId) {
this.tracePrefix = "projects/" + projectId + "/traces/";;
}
@VisibleForTesting
void enhanceLogEntry(LogEntry.Builder builder, SpanContext spanContext) {
addTracingData(tracePrefix, spanContext, builder);
}
private static void addTracingData(
String tracePrefix, SpanContext spanContext, LogEntry.Builder builder) {
builder.setTrace(formatTraceId(tracePrefix, spanContext.getTraceId()));
builder.setSpanId(spanContext.getSpanId().toLowerBase16());
builder.setTraceSampled(spanContext.getTraceOptions().isSampled());
}
private static String formatTraceId(String tracePrefix, TraceId traceId) {
return tracePrefix + traceId.toLowerBase16();
}
}

View File

@ -38,9 +38,11 @@ import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.gcp.observability.interceptors.LogHelper;
import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink;
import io.grpc.gcp.observability.logging.TraceLoggingHelper;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.protobuf.SimpleServiceGrpc;
import io.opencensus.trace.SpanContext;
import java.io.IOException;
import java.util.Collections;
import java.util.regex.Pattern;
@ -49,7 +51,9 @@ import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.AdditionalMatchers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
@RunWith(JUnit4.class)
@ -111,10 +115,12 @@ public class LoggingTest {
@Override
public void run() {
ObservabilityConfig config = mock(ObservabilityConfig.class);
when(config.getCustomTags()).thenReturn(CUSTOM_TAGS);
Sink sink =
new GcpLogSink(
PROJECT_ID, LOCATION_TAGS, CUSTOM_TAGS, Collections.emptySet());
ObservabilityConfig config = mock(ObservabilityConfig.class);
PROJECT_ID, LOCATION_TAGS, config, Collections.emptySet(),
mock(TraceLoggingHelper.class));
LogHelper spyLogHelper = spy(new LogHelper(sink));
ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class);
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory =
@ -237,7 +243,9 @@ public class LoggingTest {
// = 8
assertThat(Mockito.mockingDetails(mockSink).getInvocations().size()).isEqualTo(12);
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
verify(mockSink, times(12)).write(captor.capture());
verify(mockSink, times(12)).write(captor.capture(),
AdditionalMatchers.or(ArgumentMatchers.isNull(),
ArgumentMatchers.any(SpanContext.class)));
for (GrpcLogRecord record : captor.getAllValues()) {
assertThat(record.getType()).isInstanceOf(GrpcLogRecord.EventType.class);
assertThat(record.getLogger()).isInstanceOf(GrpcLogRecord.EventLogger.class);

View File

@ -17,6 +17,7 @@
package io.grpc.gcp.observability.interceptors;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.census.internal.ObservabilityCensusConstants.CLIENT_TRACE_SPAN_CONTEXT_KEY;
import static io.grpc.gcp.observability.interceptors.LogHelperTest.BYTEARRAY_MARSHALLER;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.any;
@ -52,6 +53,11 @@ import io.grpc.internal.NoopClientCall;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.SpanId;
import io.opencensus.trace.TraceId;
import io.opencensus.trace.TraceOptions;
import io.opencensus.trace.Tracestate;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -83,6 +89,14 @@ public class InternalLoggingChannelInterceptorTest {
public final MockitoRule mockito = MockitoJUnit.rule();
private static final Charset US_ASCII = StandardCharsets.US_ASCII;
private static final SpanContext DEFAULT_CLIENT_SPAN_CONTEXT = SpanContext.INVALID;
private static final SpanContext SPAN_CONTEXT = SpanContext.create(
TraceId.fromLowerBase16("4c6af40c499951eb7de2777ba1e4fefa"),
SpanId.fromLowerBase16("de52e84d13dd232d"),
TraceOptions.builder().setIsSampled(true).build(),
Tracestate.builder().build());
private static final CallOptions CALL_OPTIONS_WITH_SPAN_CONTEXT =
CallOptions.DEFAULT.withOption(CLIENT_TRACE_SPAN_CONTEXT_KEY, SPAN_CONTEXT);
private InternalLoggingChannelInterceptor.Factory factory;
private AtomicReference<ClientCall.Listener<byte[]>> interceptedListener;
@ -192,7 +206,8 @@ public class InternalLoggingChannelInterceptorTest {
eq(filterParams.headerBytes()),
eq(EventLogger.CLIENT),
anyString(),
ArgumentMatchers.isNull());
ArgumentMatchers.isNull(),
eq(DEFAULT_CLIENT_SPAN_CONTEXT));
verifyNoMoreInteractions(mockLogHelper);
assertSame(clientInitial, actualClientInitial.get());
}
@ -213,7 +228,8 @@ public class InternalLoggingChannelInterceptorTest {
eq(filterParams.headerBytes()),
eq(EventLogger.CLIENT),
anyString(),
same(peer));
same(peer),
any(SpanContext.class));
verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onHeaders(same(serverInitial));
}
@ -234,7 +250,8 @@ public class InternalLoggingChannelInterceptorTest {
same(request),
eq(filterParams.messageBytes()),
eq(EventLogger.CLIENT),
anyString());
anyString(),
any(SpanContext.class));
verifyNoMoreInteractions(mockLogHelper);
assertSame(request, actualRequest.get());
}
@ -251,7 +268,8 @@ public class InternalLoggingChannelInterceptorTest {
eq("method"),
eq("the-authority"),
eq(EventLogger.CLIENT),
anyString());
anyString(),
any(SpanContext.class));
halfCloseCalled.get(1, TimeUnit.MILLISECONDS);
verifyNoMoreInteractions(mockLogHelper);
}
@ -272,7 +290,8 @@ public class InternalLoggingChannelInterceptorTest {
same(response),
eq(filterParams.messageBytes()),
eq(EventLogger.CLIENT),
anyString());
anyString(),
any(SpanContext.class));
verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onMessage(same(response));
}
@ -295,7 +314,8 @@ public class InternalLoggingChannelInterceptorTest {
eq(filterParams.headerBytes()),
eq(EventLogger.CLIENT),
anyString(),
same(peer));
same(peer),
any(SpanContext.class));
verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onClose(same(status), same(trailers));
}
@ -312,7 +332,8 @@ public class InternalLoggingChannelInterceptorTest {
eq("method"),
eq("the-authority"),
eq(EventLogger.CLIENT),
anyString());
anyString(),
any(SpanContext.class));
cancelCalled.get(1, TimeUnit.MILLISECONDS);
}
}
@ -363,7 +384,8 @@ public class InternalLoggingChannelInterceptorTest {
any(GrpcLogRecord.EventLogger.class),
anyString(),
AdditionalMatchers.or(ArgumentMatchers.isNull(),
ArgumentMatchers.any()));
ArgumentMatchers.any()),
any(SpanContext.class));
Duration timeout = callOptTimeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
@ -422,7 +444,8 @@ public class InternalLoggingChannelInterceptorTest {
any(GrpcLogRecord.EventLogger.class),
anyString(),
AdditionalMatchers.or(ArgumentMatchers.isNull(),
ArgumentMatchers.any()));
ArgumentMatchers.any()),
any(SpanContext.class));
Duration timeout = contextTimeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
@ -484,7 +507,8 @@ public class InternalLoggingChannelInterceptorTest {
any(GrpcLogRecord.EventLogger.class),
anyString(),
AdditionalMatchers.or(ArgumentMatchers.isNull(),
ArgumentMatchers.any()));
ArgumentMatchers.any()),
any(SpanContext.class));
Duration timeout = timeoutCaptor.getValue();
assertThat(LogHelper.min(contextDeadline, callOptionsDeadline))
.isSameInstanceAs(contextDeadline);
@ -633,4 +657,172 @@ public class InternalLoggingChannelInterceptorTest {
assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(7);
}
}
@Test
public void clientSpanContextLogged_contextSetViaCallOption() {
Channel channel = new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
return new NoopClientCall<RequestT, ResponseT>() {
@Override
@SuppressWarnings("unchecked")
public void start(Listener<ResponseT> responseListener, Metadata headers) {
interceptedListener.set((Listener<byte[]>) responseListener);
actualClientInitial.set(headers);
}
@Override
public void sendMessage(RequestT message) {
actualRequest.set(message);
}
@Override
public void cancel(String message, Throwable cause) {
cancelCalled.set(null);
}
@Override
public void halfClose() {
halfCloseCalled.set(null);
}
@Override
public Attributes getAttributes() {
return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build();
}
};
}
@Override
public String authority() {
return "the-authority";
}
};
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.logRpcMethod(method.getFullMethodName(), true))
.thenReturn(FilterParams.create(true, 10, 10));
ClientCall<byte[], byte[]> interceptedLoggingCall =
factory.create()
.interceptCall(method,
CALL_OPTIONS_WITH_SPAN_CONTEXT,
channel);
{
interceptedLoggingCall.start(mockListener, new Metadata());
ArgumentCaptor<SpanContext> callOptSpanContextCaptor = ArgumentCaptor.forClass(
SpanContext.class);
verify(mockLogHelper, times(1))
.logClientHeader(
anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
ArgumentMatchers.isNull(),
any(Metadata.class),
anyInt(),
any(GrpcLogRecord.EventLogger.class),
anyString(),
AdditionalMatchers.or(ArgumentMatchers.isNull(),
ArgumentMatchers.any()),
callOptSpanContextCaptor.capture());
SpanContext spanContext = callOptSpanContextCaptor.getValue();
assertThat(spanContext).isEqualTo(SPAN_CONTEXT);
}
}
@Test
public void clientSpanContextLogged_contextNotSetViaCallOption() {
Channel channel = new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
return new NoopClientCall<RequestT, ResponseT>() {
@Override
@SuppressWarnings("unchecked")
public void start(Listener<ResponseT> responseListener, Metadata headers) {
interceptedListener.set((Listener<byte[]>) responseListener);
actualClientInitial.set(headers);
}
@Override
public void sendMessage(RequestT message) {
actualRequest.set(message);
}
@Override
public void cancel(String message, Throwable cause) {
cancelCalled.set(null);
}
@Override
public void halfClose() {
halfCloseCalled.set(null);
}
@Override
public Attributes getAttributes() {
return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build();
}
};
}
@Override
public String authority() {
return "the-authority";
}
};
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.logRpcMethod(method.getFullMethodName(), true))
.thenReturn(FilterParams.create(true, 10, 10));
ClientCall<byte[], byte[]> interceptedLoggingCall =
factory.create()
.interceptCall(method,
CallOptions.DEFAULT,
channel);
{
interceptedLoggingCall.start(mockListener, new Metadata());
ArgumentCaptor<SpanContext> callOptSpanContextCaptor = ArgumentCaptor.forClass(
SpanContext.class);
verify(mockLogHelper, times(1))
.logClientHeader(
anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
ArgumentMatchers.isNull(),
any(Metadata.class),
anyInt(),
any(GrpcLogRecord.EventLogger.class),
anyString(),
AdditionalMatchers.or(ArgumentMatchers.isNull(),
ArgumentMatchers.any()),
callOptSpanContextCaptor.capture());
SpanContext spanContext = callOptSpanContextCaptor.getValue();
assertThat(spanContext).isEqualTo(DEFAULT_CLIENT_SPAN_CONTEXT);
}
}
}

View File

@ -45,6 +45,7 @@ import io.grpc.gcp.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.internal.NoopServerCall;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.opencensus.trace.SpanContext;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -171,7 +172,8 @@ public class InternalLoggingServerInterceptorTest {
eq(filterParams.headerBytes()),
eq(EventLogger.SERVER),
anyString(),
same(peer));
same(peer),
eq(SpanContext.INVALID));
verifyNoMoreInteractions(mockLogHelper);
}
@ -191,7 +193,8 @@ public class InternalLoggingServerInterceptorTest {
eq(filterParams.headerBytes()),
eq(EventLogger.SERVER),
anyString(),
ArgumentMatchers.isNull());
ArgumentMatchers.isNull(),
eq(SpanContext.INVALID));
verifyNoMoreInteractions(mockLogHelper);
assertSame(serverInitial, actualServerInitial.get());
}
@ -212,7 +215,8 @@ public class InternalLoggingServerInterceptorTest {
same(request),
eq(filterParams.messageBytes()),
eq(EventLogger.SERVER),
anyString());
anyString(),
eq(SpanContext.INVALID));
verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onMessage(same(request));
}
@ -229,7 +233,8 @@ public class InternalLoggingServerInterceptorTest {
eq("method"),
eq("the-authority"),
eq(EventLogger.SERVER),
anyString());
anyString(),
eq(SpanContext.INVALID));
verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onHalfClose();
}
@ -250,7 +255,8 @@ public class InternalLoggingServerInterceptorTest {
same(response),
eq(filterParams.messageBytes()),
eq(EventLogger.SERVER),
anyString());
anyString(),
eq(SpanContext.INVALID));
verifyNoMoreInteractions(mockLogHelper);
assertSame(response, actualResponse.get());
}
@ -273,7 +279,8 @@ public class InternalLoggingServerInterceptorTest {
eq(filterParams.headerBytes()),
eq(EventLogger.SERVER),
anyString(),
ArgumentMatchers.isNull());
ArgumentMatchers.isNull(),
eq(SpanContext.INVALID));
verifyNoMoreInteractions(mockLogHelper);
assertSame(status, actualStatus.get());
assertSame(trailers, actualTrailers.get());
@ -291,7 +298,8 @@ public class InternalLoggingServerInterceptorTest {
eq("method"),
eq("the-authority"),
eq(EventLogger.SERVER),
anyString());
anyString(),
eq(SpanContext.INVALID));
verify(mockListener).onCancel();
}
}
@ -342,7 +350,8 @@ public class InternalLoggingServerInterceptorTest {
eq(filterParams.headerBytes()),
eq(EventLogger.SERVER),
anyString(),
ArgumentMatchers.isNull());
ArgumentMatchers.isNull(),
eq(SpanContext.INVALID));
verifyNoMoreInteractions(mockLogHelper);
Duration timeout = timeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))

View File

@ -43,6 +43,11 @@ import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.observabilitylog.v1.Payload;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.SpanId;
import io.opencensus.trace.TraceId;
import io.opencensus.trace.TraceOptions;
import io.opencensus.trace.Tracestate;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@ -75,11 +80,22 @@ public class LogHelperTest {
Metadata.Key.of("c", Metadata.ASCII_STRING_MARSHALLER);
private static final int HEADER_LIMIT = 10;
private static final int MESSAGE_LIMIT = Integer.MAX_VALUE;
private static final SpanContext CLIENT_SPAN_CONTEXT = SpanContext.create(
TraceId.fromLowerBase16("4c6af40c499951eb7de2777ba1e4fefa"),
SpanId.fromLowerBase16("de52e84d13dd232d"),
TraceOptions.builder().setIsSampled(true).build(),
Tracestate.builder().build());
private static final SpanContext SERVER_SPAN_CONTEXT = SpanContext.create(
TraceId.fromLowerBase16("549a8a64db2d0c757fdf6bb1bfe84e2c"),
SpanId.fromLowerBase16("a5b7704614fe903d"),
TraceOptions.builder().setIsSampled(true).build(),
Tracestate.builder().build());
private final Metadata nonEmptyMetadata = new Metadata();
private final Sink sink = mock(GcpLogSink.class);
private final LogHelper logHelper = new LogHelper(sink);
@Before
public void setUp() {
nonEmptyMetadata.put(KEY_A, DATA_A);
@ -288,8 +304,9 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.CLIENT,
callId,
null);
verify(sink).write(base);
null,
CLIENT_SPAN_CONTEXT);
verify(sink).write(base, CLIENT_SPAN_CONTEXT);
}
// logged on server
@ -304,12 +321,14 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.SERVER,
callId,
peerAddress);
peerAddress,
SERVER_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.setPeer(LogHelper.socketAddressToProto(peerAddress))
.setLogger(EventLogger.SERVER)
.build());
.build(),
SERVER_SPAN_CONTEXT);
}
// timeout is null
@ -324,11 +343,13 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.CLIENT,
callId,
null);
null,
CLIENT_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.setPayload(base.getPayload().toBuilder().clearTimeout().build())
.build());
.build(),
CLIENT_SPAN_CONTEXT);
}
// peerAddress is not null (error on client)
@ -343,7 +364,8 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.CLIENT,
callId,
peerAddress);
peerAddress,
CLIENT_SPAN_CONTEXT);
fail();
} catch (IllegalArgumentException expected) {
assertThat(expected).hasMessageThat().contains("peerAddress can only be specified by server");
@ -386,8 +408,9 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.CLIENT,
callId,
peerAddress);
verify(sink).write(base);
peerAddress,
CLIENT_SPAN_CONTEXT);
verify(sink).write(base, CLIENT_SPAN_CONTEXT);
}
// logged on server
@ -401,12 +424,14 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.SERVER,
callId,
null);
null,
SERVER_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.setLogger(EventLogger.SERVER)
.clearPeer()
.build());
.build(),
SERVER_SPAN_CONTEXT);
}
// peerAddress is not null (error on server)
@ -420,7 +445,8 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.SERVER,
callId,
peerAddress);
peerAddress,
SERVER_SPAN_CONTEXT);
fail();
} catch (IllegalArgumentException expected) {
@ -472,8 +498,9 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.CLIENT,
callId,
peer);
verify(sink).write(base);
peer,
CLIENT_SPAN_CONTEXT);
verify(sink).write(base, CLIENT_SPAN_CONTEXT);
}
// logged on server
@ -488,12 +515,14 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.SERVER,
callId,
null);
null,
SERVER_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.clearPeer()
.setLogger(EventLogger.SERVER)
.build());
.build(),
SERVER_SPAN_CONTEXT);
}
// peer address is null
@ -508,11 +537,13 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.CLIENT,
callId,
null);
null,
CLIENT_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.clearPeer()
.build());
.build(),
CLIENT_SPAN_CONTEXT);
}
// status description is null
@ -527,11 +558,13 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.CLIENT,
callId,
peer);
peer,
CLIENT_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.setPayload(base.getPayload().toBuilder().clearStatusMessage().build())
.build());
.build(),
CLIENT_SPAN_CONTEXT);
}
}
@ -591,8 +624,9 @@ public class LogHelperTest {
message,
MESSAGE_LIMIT,
EventLogger.CLIENT,
callId);
verify(sink).write(base);
callId,
CLIENT_SPAN_CONTEXT);
verify(sink).write(base, CLIENT_SPAN_CONTEXT);
}
// response message, logged on client
{
@ -605,11 +639,13 @@ public class LogHelperTest {
message,
MESSAGE_LIMIT,
EventLogger.CLIENT,
callId);
callId,
CLIENT_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.setType(EventType.SERVER_MESSAGE)
.build());
.build(),
CLIENT_SPAN_CONTEXT);
}
// request message, logged on server
{
@ -622,11 +658,13 @@ public class LogHelperTest {
message,
MESSAGE_LIMIT,
EventLogger.SERVER,
callId);
callId,
SERVER_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.setLogger(EventLogger.SERVER)
.build());
.build(),
SERVER_SPAN_CONTEXT);
}
// response message, logged on server
{
@ -639,12 +677,14 @@ public class LogHelperTest {
message,
MESSAGE_LIMIT,
EventLogger.SERVER,
callId);
callId,
SpanContext.INVALID);
verify(sink).write(
base.toBuilder()
.setType(EventType.SERVER_MESSAGE)
.setLogger(EventLogger.SERVER)
.build());
.build(),
SpanContext.INVALID);
}
// message is not of type : com.google.protobuf.Message or byte[]
{
@ -657,12 +697,14 @@ public class LogHelperTest {
"message",
MESSAGE_LIMIT,
EventLogger.CLIENT,
callId);
callId,
CLIENT_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.clearPayload()
.clearPayloadTruncated()
.build());
.build(),
CLIENT_SPAN_CONTEXT);
}
}

View File

@ -22,6 +22,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.cloud.MonitoredResource;
import com.google.cloud.logging.LogEntry;
@ -31,9 +32,15 @@ import com.google.protobuf.Duration;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import com.google.protobuf.util.Durations;
import io.grpc.gcp.observability.ObservabilityConfig;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.SpanId;
import io.opencensus.trace.TraceId;
import io.opencensus.trace.TraceOptions;
import io.opencensus.trace.Tracestate;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -105,13 +112,16 @@ public class GcpLogSinkTest {
.build();
@Mock
private Logging mockLogging;
@Mock
private ObservabilityConfig mockConfig;
@Test
@SuppressWarnings("unchecked")
public void verifyWrite() throws Exception {
when(mockConfig.getCustomTags()).thenReturn(CUSTOM_TAGS);
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
CUSTOM_TAGS, Collections.emptySet());
sink.write(LOG_PROTO);
mockConfig, Collections.emptySet(), new TraceLoggingHelper(DEST_PROJECT_NAME));
sink.write(LOG_PROTO, null);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
@ -127,10 +137,11 @@ public class GcpLogSinkTest {
@Test
@SuppressWarnings("unchecked")
public void verifyWriteWithTags() {
when(mockConfig.getCustomTags()).thenReturn(CUSTOM_TAGS);
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
CUSTOM_TAGS, Collections.emptySet());
mockConfig, Collections.emptySet(), new TraceLoggingHelper(DEST_PROJECT_NAME));
MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(LOCATION_TAGS);
sink.write(LOG_PROTO);
sink.write(LOG_PROTO, null);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
@ -150,10 +161,11 @@ public class GcpLogSinkTest {
@SuppressWarnings("unchecked")
public void emptyCustomTags_labelsNotSet() {
Map<String, String> emptyCustomTags = null;
when(mockConfig.getCustomTags()).thenReturn(emptyCustomTags);
Map<String, String> expectedEmptyLabels = new HashMap<>();
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
emptyCustomTags, Collections.emptySet());
sink.write(LOG_PROTO);
mockConfig, Collections.emptySet(), new TraceLoggingHelper(DEST_PROJECT_NAME));
sink.write(LOG_PROTO, null);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
@ -169,12 +181,13 @@ public class GcpLogSinkTest {
@SuppressWarnings("unchecked")
public void emptyCustomTags_setSourceProject() {
Map<String, String> emptyCustomTags = null;
when(mockConfig.getCustomTags()).thenReturn(emptyCustomTags);
String projectId = "PROJECT";
Map<String, String> expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, LOCATION_TAGS,
projectId);
GcpLogSink sink = new GcpLogSink(mockLogging, projectId, LOCATION_TAGS,
emptyCustomTags, Collections.emptySet());
sink.write(LOG_PROTO);
mockConfig, Collections.emptySet(), new TraceLoggingHelper(DEST_PROJECT_NAME));
sink.write(LOG_PROTO, null);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
@ -189,8 +202,8 @@ public class GcpLogSinkTest {
@Test
public void verifyClose() throws Exception {
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
CUSTOM_TAGS, Collections.emptySet());
sink.write(LOG_PROTO);
mockConfig, Collections.emptySet(), new TraceLoggingHelper(DEST_PROJECT_NAME));
sink.write(LOG_PROTO, null);
verify(mockLogging, times(1)).write(anyIterable());
sink.close();
verify(mockLogging).close();
@ -200,8 +213,106 @@ public class GcpLogSinkTest {
@Test
public void verifyExclude() throws Exception {
Sink mockSink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
CUSTOM_TAGS, Collections.singleton("service"));
mockSink.write(LOG_PROTO);
mockConfig, Collections.singleton("service"), new TraceLoggingHelper(DEST_PROJECT_NAME));
mockSink.write(LOG_PROTO, null);
verifyNoInteractions(mockLogging);
}
@Test
@SuppressWarnings("unchecked")
public void verifyNoTraceDataInLogs_withTraceDisabled() throws Exception {
SpanContext validSpanContext = SpanContext.create(
TraceId.fromLowerBase16("4c6af40c499951eb7de2777ba1e4fefa"),
SpanId.fromLowerBase16("de52e84d13dd232d"),
TraceOptions.builder().setIsSampled(true).build(),
Tracestate.builder().build());
TraceLoggingHelper traceLoggingHelper = new TraceLoggingHelper(DEST_PROJECT_NAME);
when(mockConfig.isEnableCloudTracing()).thenReturn(false);
Sink mockSink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
mockConfig, Collections.emptySet(), traceLoggingHelper);
mockSink.write(LOG_PROTO, validSpanContext);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
verify(mockLogging, times(1)).write(logEntrySetCaptor.capture());
for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
LogEntry entry = it.next();
assertThat(entry.getTrace()).isNull(); // Field not present
assertThat(entry.getSpanId()).isNull(); // Field not present
assertThat(entry.getTraceSampled()).isFalse(); // Default value
assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO);
}
}
@Test
@SuppressWarnings("unchecked")
public void verifyTraceDataInLogs_withValidSpanContext() throws Exception {
CharSequence traceIdSeq = "4c6af40c499951eb7de2777ba1e4fefa";
CharSequence spanIdSeq = "de52e84d13dd232d";
TraceId traceId = TraceId.fromLowerBase16(traceIdSeq);
SpanId spanId = SpanId.fromLowerBase16(spanIdSeq);
boolean traceSampled = true;
SpanContext validSpanContext = SpanContext.create(traceId, spanId,
TraceOptions.builder().setIsSampled(traceSampled).build(),
Tracestate.builder().build());
TraceLoggingHelper traceLoggingHelper = new TraceLoggingHelper(DEST_PROJECT_NAME);
when(mockConfig.isEnableCloudTracing()).thenReturn(true);
Sink mockSink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
mockConfig, Collections.emptySet(), traceLoggingHelper);
mockSink.write(LOG_PROTO, validSpanContext);
String expectedTrace = "projects/" + DEST_PROJECT_NAME + "/traces/" + traceIdSeq;
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
verify(mockLogging, times(1)).write(logEntrySetCaptor.capture());
for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
LogEntry entry = it.next();
assertThat(entry.getTrace()).isEqualTo(expectedTrace);
assertThat(entry.getSpanId()).isEqualTo("" + spanIdSeq);
assertThat(entry.getTraceSampled()).isEqualTo(traceSampled);
assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO);
}
}
@Test
@SuppressWarnings("unchecked")
public void verifyTraceDataLogs_withNullSpanContext() throws Exception {
TraceLoggingHelper traceLoggingHelper = new TraceLoggingHelper(DEST_PROJECT_NAME);
when(mockConfig.isEnableCloudTracing()).thenReturn(true);
Sink mockSink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
mockConfig, Collections.emptySet(), traceLoggingHelper);
String expectedTrace =
"projects/" + DEST_PROJECT_NAME + "/traces/00000000000000000000000000000000";
String expectedSpanId = "0000000000000000";
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
// Client log with default span context
mockSink.write(LOG_PROTO , SpanContext.INVALID);
verify(mockLogging, times(1)).write(logEntrySetCaptor.capture());
for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
LogEntry entry = it.next();
assertThat(entry.getTrace()).isEqualTo(expectedTrace);
assertThat(entry.getSpanId()).isEqualTo(expectedSpanId);
assertThat(entry.getTraceSampled()).isFalse();
}
// Server log
GrpcLogRecord serverLogProto = LOG_PROTO.toBuilder().setLogger(EventLogger.SERVER).build();
mockSink.write(serverLogProto , SpanContext.INVALID);
verify(mockLogging, times(2)).write(logEntrySetCaptor.capture());
for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
LogEntry entry = it.next();
assertThat(entry.getTrace()).isEqualTo(expectedTrace);
assertThat(entry.getSpanId()).isEqualTo(expectedSpanId);
assertThat(entry.getTraceSampled()).isFalse();
}
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.gcp.observability.logging;
import static com.google.common.truth.Truth.assertThat;
import com.google.cloud.logging.LogEntry;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.SpanId;
import io.opencensus.trace.TraceId;
import io.opencensus.trace.TraceOptions;
import io.opencensus.trace.Tracestate;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Tests for {@link TraceLoggingHelper}.
*/
@RunWith(JUnit4.class)
public class TraceLoggingHelperTest {
private static final String PROJECT = "PROJECT";
private static final Tracestate EMPTY_TRACESTATE = Tracestate.builder().build();
private static TraceLoggingHelper traceLoggingHelper;
@Before
public void setUp() {
traceLoggingHelper = new TraceLoggingHelper(PROJECT);
}
@Test
public void enhanceLogEntry_AddSampledSpanContextToLogEntry() {
SpanContext spanContext = SpanContext.create(
TraceId.fromLowerBase16("5ce724c382c136b2a67bb447e6a6bd27"),
SpanId.fromLowerBase16("de52e84d13dd232d"),
TraceOptions.builder().setIsSampled(true).build(),
EMPTY_TRACESTATE);
LogEntry logEntry = getEnhancedLogEntry(traceLoggingHelper, spanContext);
assertThat(logEntry.getTraceSampled()).isTrue();
assertThat(logEntry.getTrace())
.isEqualTo("projects/PROJECT/traces/5ce724c382c136b2a67bb447e6a6bd27");
assertThat(logEntry.getSpanId()).isEqualTo("de52e84d13dd232d");
}
@Test
public void enhanceLogEntry_AddNonSampledSpanContextToLogEntry() {
SpanContext spanContext = SpanContext.create(
TraceId.fromLowerBase16("649a8a64db2d0c757fd06bb1bfe84e2c"),
SpanId.fromLowerBase16("731e102335b7a5a0"),
TraceOptions.builder().setIsSampled(false).build(),
EMPTY_TRACESTATE);
LogEntry logEntry = getEnhancedLogEntry(traceLoggingHelper, spanContext);
assertThat(logEntry.getTraceSampled()).isFalse();
assertThat(logEntry.getTrace())
.isEqualTo("projects/PROJECT/traces/649a8a64db2d0c757fd06bb1bfe84e2c");
assertThat(logEntry.getSpanId()).isEqualTo("731e102335b7a5a0");
}
@Test
public void enhanceLogEntry_AddBlankSpanContextToLogEntry() {
SpanContext spanContext = SpanContext.INVALID;
LogEntry logEntry = getEnhancedLogEntry(traceLoggingHelper, spanContext);
assertThat(logEntry.getTraceSampled()).isFalse();
assertThat(logEntry.getTrace())
.isEqualTo("projects/PROJECT/traces/00000000000000000000000000000000");
assertThat(logEntry.getSpanId()).isEqualTo("0000000000000000");
}
private static LogEntry getEnhancedLogEntry(TraceLoggingHelper traceLoggingHelper,
SpanContext spanContext) {
LogEntry.Builder logEntryBuilder = LogEntry.newBuilder(null);
traceLoggingHelper.enhanceLogEntry(logEntryBuilder, spanContext);
return logEntryBuilder.build();
}
}