mirror of https://github.com/grpc/grpc-java.git
gcp-observability: Update logging fields for GA and use custom BatchingSettings (#9959)
This commit updates the following in gcp observability logging schema * `payload.status_code` will be of type `google.rpc.Code` instead of `uint32`. * names in enum `Address.TYPE` Use custom batching settings for [LoggingOptions](https://javadoc.io/doc/com.google.cloud/google-cloud-logging/latest/com/google/cloud/logging/LoggingOptions.html) Note: Upgraded `com.google.cloud:google-cloud-logging` from `3.6.1` to `3.14.5`.
This commit is contained in:
parent
c1ff4a856d
commit
1b799adc19
|
@ -20,11 +20,13 @@ tasks.named("compileJava").configure {
|
|||
}
|
||||
|
||||
dependencies {
|
||||
def cloudLoggingVersion = '3.6.1'
|
||||
def cloudLoggingVersion = '3.14.5'
|
||||
|
||||
annotationProcessor libraries.auto.value
|
||||
api project(':grpc-api')
|
||||
|
||||
|
||||
// TODO(dnvindhya): Prefer using our own libraries, update the dependencies
|
||||
// in gradle/libs.versions instead
|
||||
implementation project(':grpc-protobuf'),
|
||||
project(':grpc-stub'),
|
||||
project(':grpc-alts'),
|
||||
|
@ -35,12 +37,10 @@ dependencies {
|
|||
libraries.opencensus.exporter.trace.stackdriver,
|
||||
project(':grpc-xds'), // Align grpc versions
|
||||
project(':grpc-services'), // Align grpc versions
|
||||
libraries.animalsniffer.annotations, // Prefer our version
|
||||
libraries.google.auth.credentials, // Prefer our version
|
||||
libraries.protobuf.java.util, // Prefer our version
|
||||
libraries.gson, // Prefer our version
|
||||
libraries.perfmark.api, // Prefer our version
|
||||
libraries.re2j, // Prefer our version
|
||||
('com.google.protobuf:protobuf-java:3.21.12'),
|
||||
('com.google.api.grpc:proto-google-common-protos:2.14.2'),
|
||||
('com.google.auth:google-auth-library-oauth2-http:1.16.0'),
|
||||
('io.opencensus:opencensus-api:0.31.1'),
|
||||
('com.google.guava:guava:31.1-jre')
|
||||
|
||||
runtimeOnly libraries.opencensus.impl
|
||||
|
|
|
@ -23,6 +23,7 @@ import static io.grpc.InternalMetadata.BASE64_ENCODING_OMIT_PADDING;
|
|||
import com.google.common.base.Joiner;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Duration;
|
||||
import com.google.rpc.Code;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.Deadline;
|
||||
import io.grpc.Grpc;
|
||||
|
@ -182,7 +183,7 @@ public class LogHelper {
|
|||
|
||||
PayloadBuilderHelper<Payload.Builder> pair =
|
||||
createMetadataProto(metadata, maxHeaderBytes);
|
||||
pair.payloadBuilder.setStatusCode(status.getCode().value());
|
||||
pair.payloadBuilder.setStatusCode(Code.forNumber(status.getCode().value()));
|
||||
String statusDescription = status.getDescription();
|
||||
if (statusDescription != null) {
|
||||
pair.payloadBuilder.setStatusMessage(statusDescription);
|
||||
|
@ -404,10 +405,10 @@ public class LogHelper {
|
|||
if (address instanceof InetSocketAddress) {
|
||||
InetAddress inetAddress = ((InetSocketAddress) address).getAddress();
|
||||
if (inetAddress instanceof Inet4Address) {
|
||||
builder.setType(Address.Type.TYPE_IPV4)
|
||||
builder.setType(Address.Type.IPV4)
|
||||
.setAddress(InetAddressUtil.toAddrString(inetAddress));
|
||||
} else if (inetAddress instanceof Inet6Address) {
|
||||
builder.setType(Address.Type.TYPE_IPV6)
|
||||
builder.setType(Address.Type.IPV6)
|
||||
.setAddress(InetAddressUtil.toAddrString(inetAddress));
|
||||
} else {
|
||||
logger.log(Level.SEVERE, "unknown type of InetSocketAddress: {}", address);
|
||||
|
@ -417,7 +418,7 @@ public class LogHelper {
|
|||
} else if (address.getClass().getName().equals("io.netty.channel.unix.DomainSocketAddress")) {
|
||||
// To avoid a compiled time dependency on grpc-netty, we check against the
|
||||
// runtime class name.
|
||||
builder.setType(Address.Type.TYPE_UNIX)
|
||||
builder.setType(Address.Type.UNIX)
|
||||
.setAddress(address.toString());
|
||||
} else {
|
||||
builder.setType(Address.Type.TYPE_UNKNOWN).setAddress(address.toString());
|
||||
|
|
|
@ -18,12 +18,15 @@ package io.grpc.gcp.observability.logging;
|
|||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
import com.google.api.gax.batching.BatchingSettings;
|
||||
import com.google.api.gax.batching.FlowController;
|
||||
import com.google.cloud.MonitoredResource;
|
||||
import com.google.cloud.logging.LogEntry;
|
||||
import com.google.cloud.logging.Logging;
|
||||
import com.google.cloud.logging.LoggingOptions;
|
||||
import com.google.cloud.logging.Payload.JsonPayload;
|
||||
import com.google.cloud.logging.Severity;
|
||||
import com.google.cloud.logging.v2.stub.LoggingServiceV2StubSettings;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
@ -41,6 +44,7 @@ import java.util.Objects;
|
|||
import java.util.Set;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import org.threeten.bp.Duration;
|
||||
|
||||
/**
|
||||
* Sink for Google Cloud Logging.
|
||||
|
@ -102,6 +106,7 @@ public class GcpLogSink implements Sink {
|
|||
if (servicesToExclude.contains(logProto.getServiceName())) {
|
||||
return;
|
||||
}
|
||||
LogEntry grpcLogEntry = null;
|
||||
try {
|
||||
GrpcLogRecord.EventType eventType = logProto.getType();
|
||||
// TODO(DNVindhya): make sure all (int, long) values are not displayed as double
|
||||
|
@ -117,11 +122,18 @@ public class GcpLogSink implements Sink {
|
|||
if (!customTags.isEmpty()) {
|
||||
grpcLogEntryBuilder.setLabels(customTags);
|
||||
}
|
||||
LogEntry grpcLogEntry = grpcLogEntryBuilder.build();
|
||||
grpcLogEntry = grpcLogEntryBuilder.build();
|
||||
synchronized (this) {
|
||||
logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", eventType);
|
||||
gcpLoggingClient.write(Collections.singleton(grpcLogEntry));
|
||||
}
|
||||
} catch (FlowController.FlowControlRuntimeException e) {
|
||||
String grpcLogEntryString = null;
|
||||
if (grpcLogEntry != null) {
|
||||
grpcLogEntryString = grpcLogEntry.toStructuredJsonString();
|
||||
}
|
||||
logger.log(Level.SEVERE, "Limit exceeded while writing log entry to cloud logging");
|
||||
logger.log(Level.SEVERE, "Log entry = ", grpcLogEntryString);
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.SEVERE, "Caught exception while writing to Cloud Logging", e);
|
||||
}
|
||||
|
@ -132,6 +144,16 @@ public class GcpLogSink implements Sink {
|
|||
if (!Strings.isNullOrEmpty(projectId)) {
|
||||
builder.setProjectId(projectId);
|
||||
}
|
||||
BatchingSettings loggingDefaultBatchingSettings = LoggingServiceV2StubSettings.newBuilder()
|
||||
.writeLogEntriesSettings().getBatchingSettings();
|
||||
// Custom batching settings
|
||||
BatchingSettings grpcLoggingVBatchingSettings = loggingDefaultBatchingSettings.toBuilder()
|
||||
.setDelayThreshold(Duration.ofSeconds(1L)).setFlowControlSettings(
|
||||
loggingDefaultBatchingSettings.getFlowControlSettings().toBuilder()
|
||||
.setMaxOutstandingRequestBytes(52428800L) //50 MiB
|
||||
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
|
||||
.build()).build();
|
||||
builder.setBatchingSettings(grpcLoggingVBatchingSettings);
|
||||
return builder.build().getService();
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ package grpc.observabilitylog.v1;
|
|||
|
||||
import "google/protobuf/duration.proto";
|
||||
import "google/protobuf/timestamp.proto";
|
||||
import "google/rpc/code.proto";
|
||||
|
||||
option java_multiple_files = true;
|
||||
option java_package = "io.grpc.observabilitylog.v1";
|
||||
|
@ -97,7 +98,7 @@ message Payload {
|
|||
// the RPC timeout value
|
||||
google.protobuf.Duration timeout = 2;
|
||||
// The gRPC status code
|
||||
uint32 status_code = 3;
|
||||
google.rpc.Code status_code = 3;
|
||||
// The gRPC status message
|
||||
string status_message = 4;
|
||||
// The value of the grpc-status-details-bin metadata key, if any.
|
||||
|
@ -115,9 +116,9 @@ message Payload {
|
|||
message Address {
|
||||
enum Type {
|
||||
TYPE_UNKNOWN = 0;
|
||||
TYPE_IPV4 = 1; // in 1.2.3.4 form
|
||||
TYPE_IPV6 = 2; // IPv6 canonical form (RFC5952 section 4)
|
||||
TYPE_UNIX = 3; // UDS string
|
||||
IPV4 = 1; // in 1.2.3.4 form
|
||||
IPV6 = 2; // IPv6 canonical form (RFC5952 section 4)
|
||||
UNIX = 3; // UDS string
|
||||
}
|
||||
Type type = 1;
|
||||
string address = 2;
|
||||
|
|
|
@ -29,6 +29,7 @@ import static org.mockito.Mockito.verify;
|
|||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Duration;
|
||||
import com.google.protobuf.util.Durations;
|
||||
import com.google.rpc.Code;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.Grpc;
|
||||
import io.grpc.Metadata;
|
||||
|
@ -94,7 +95,7 @@ public class LogHelperTest {
|
|||
assertThat(LogHelper.socketAddressToProto(socketAddress))
|
||||
.isEqualTo(Address
|
||||
.newBuilder()
|
||||
.setType(Address.Type.TYPE_IPV4)
|
||||
.setType(Address.Type.IPV4)
|
||||
.setAddress("127.0.0.1")
|
||||
.setIpPort(12345)
|
||||
.build());
|
||||
|
@ -109,7 +110,7 @@ public class LogHelperTest {
|
|||
assertThat(LogHelper.socketAddressToProto(socketAddress))
|
||||
.isEqualTo(Address
|
||||
.newBuilder()
|
||||
.setType(Address.Type.TYPE_IPV6)
|
||||
.setType(Address.Type.IPV6)
|
||||
.setAddress("2001:db8::2:1") // RFC 5952 section 4: ipv6 canonical form required
|
||||
.setIpPort(12345)
|
||||
.build());
|
||||
|
@ -454,7 +455,7 @@ public class LogHelperTest {
|
|||
builder.setPeer(LogHelper.socketAddressToProto(peer));
|
||||
builder.setPayload(
|
||||
builder.getPayload().toBuilder()
|
||||
.setStatusCode(Status.INTERNAL.getCode().value())
|
||||
.setStatusCode(Code.forNumber(Status.INTERNAL.getCode().value()))
|
||||
.setStatusMessage("test description")
|
||||
.build());
|
||||
GrpcLogRecord base = builder.build();
|
||||
|
|
Loading…
Reference in New Issue