diff --git a/build.gradle b/build.gradle index 9f1d091aa1..3c03c91351 100644 --- a/build.gradle +++ b/build.gradle @@ -192,6 +192,7 @@ subprojects { okhttp: 'com.squareup.okhttp:okhttp:2.5.0', okio: 'com.squareup.okio:okio:1.6.0', opencensus_api: 'io.opencensus:opencensus-api:0.8.0', + opencensus_contrib_grpc_metrics: 'io.opencensus:opencensus-contrib-grpc-metrics:0.8.0', opencensus_impl: 'io.opencensus:opencensus-impl:0.8.0', instrumentation_api: 'com.google.instrumentation:instrumentation-api:0.4.3', protobuf: "com.google.protobuf:protobuf-java:${protobufVersion}", diff --git a/core/build.gradle b/core/build.gradle index b6cd5f51fa..74a31f3bdd 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -19,6 +19,12 @@ dependencies { // we'll always be more up-to-date exclude group: 'io.grpc', module: 'grpc-context' } + compile (libraries.opencensus_contrib_grpc_metrics) { + // prefer 3.0.0 from libraries instead of 3.0.1 + exclude group: 'com.google.code.findbugs', module: 'jsr305' + // we'll always be more up-to-date + exclude group: 'io.grpc', module: 'grpc-context' + } testCompile project(':grpc-testing') diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index 60387b83cd..ebbe2ca3a4 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -21,8 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; -import com.google.instrumentation.stats.Stats; -import com.google.instrumentation.stats.StatsContextFactory; import io.grpc.Attributes; import io.grpc.ClientInterceptor; import io.grpc.CompressorRegistry; @@ -150,7 +148,7 @@ public abstract class AbstractManagedChannelImplBuilder private boolean tracingEnabled = true; @Nullable - private StatsContextFactory statsFactory; + private CensusStatsModule censusStatsOverride; protected AbstractManagedChannelImplBuilder(String target) { this.target = Preconditions.checkNotNull(target, "target"); @@ -285,8 +283,8 @@ public abstract class AbstractManagedChannelImplBuilder * Override the default stats implementation. */ @VisibleForTesting - protected final T statsContextFactory(StatsContextFactory statsFactory) { - this.statsFactory = statsFactory; + protected final T overrideCensusStatsModule(CensusStatsModule censusStats) { + this.censusStatsOverride = censusStats; return thisT(); } @@ -344,15 +342,13 @@ public abstract class AbstractManagedChannelImplBuilder List effectiveInterceptors = new ArrayList(this.interceptors); if (statsEnabled) { - StatsContextFactory statsCtxFactory = - this.statsFactory != null ? this.statsFactory : Stats.getStatsContextFactory(); - if (statsCtxFactory != null) { - CensusStatsModule censusStats = - new CensusStatsModule(statsCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER, true, recordStats); - // First interceptor runs last (see ClientInterceptors.intercept()), so that no - // other interceptor can override the tracer factory we set in CallOptions. - effectiveInterceptors.add(0, censusStats.getClientInterceptor()); + CensusStatsModule censusStats = this.censusStatsOverride; + if (censusStats == null) { + censusStats = new CensusStatsModule(GrpcUtil.STOPWATCH_SUPPLIER, true); } + // First interceptor runs last (see ClientInterceptors.intercept()), so that no + // other interceptor can override the tracer factory we set in CallOptions. + effectiveInterceptors.add(0, censusStats.getClientInterceptor(recordStats)); } if (tracingEnabled) { CensusTracingModule censusTracing = diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java index 11c1a9ac4a..29768d1503 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java @@ -20,8 +20,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.MoreExecutors; -import com.google.instrumentation.stats.Stats; -import com.google.instrumentation.stats.StatsContextFactory; import io.grpc.BindableService; import io.grpc.CompressorRegistry; import io.grpc.Context; @@ -97,7 +95,7 @@ public abstract class AbstractServerImplBuilder tracerFactories = new ArrayList(); if (statsEnabled) { - StatsContextFactory statsFactory = - this.statsFactory != null ? this.statsFactory : Stats.getStatsContextFactory(); - if (statsFactory != null) { - CensusStatsModule censusStats = - new CensusStatsModule(statsFactory, GrpcUtil.STOPWATCH_SUPPLIER, true, recordStats); - tracerFactories.add(censusStats.getServerTracerFactory()); + CensusStatsModule censusStats = this.censusStatsOverride; + if (censusStats == null) { + censusStats = new CensusStatsModule(GrpcUtil.STOPWATCH_SUPPLIER, true); } + tracerFactories.add(censusStats.getServerTracerFactory(recordStats)); } if (tracingEnabled) { CensusTracingModule censusTracing = diff --git a/core/src/main/java/io/grpc/internal/CensusStatsModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java index 127b83d6c1..524f3a3996 100644 --- a/core/src/main/java/io/grpc/internal/CensusStatsModule.java +++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java @@ -19,16 +19,11 @@ package io.grpc.internal; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import static com.google.instrumentation.stats.ContextUtils.STATS_CONTEXT_KEY; +import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; -import com.google.instrumentation.stats.MeasurementMap; -import com.google.instrumentation.stats.RpcConstants; -import com.google.instrumentation.stats.StatsContext; -import com.google.instrumentation.stats.StatsContextFactory; -import com.google.instrumentation.stats.TagValue; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -42,9 +37,16 @@ import io.grpc.MethodDescriptor; import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.StreamTracer; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; +import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; +import io.opencensus.stats.MeasureMap; +import io.opencensus.stats.Stats; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.Tags; +import io.opencensus.tags.propagation.TagContextBinarySerializer; +import io.opencensus.tags.propagation.TagContextSerializationException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -64,49 +66,63 @@ import javax.annotation.Nullable; * starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call and * it's the tracer that reports the summary to Census. */ -final class CensusStatsModule { +public final class CensusStatsModule { private static final Logger logger = Logger.getLogger(CensusStatsModule.class.getName()); private static final double NANOS_PER_MILLI = TimeUnit.MILLISECONDS.toNanos(1); private static final ClientTracer BLANK_CLIENT_TRACER = new ClientTracer(); - private final StatsContextFactory statsCtxFactory; + private final Tagger tagger; + private final StatsRecorder statsRecorder; private final Supplier stopwatchSupplier; @VisibleForTesting - final Metadata.Key statsHeader; - private final StatsClientInterceptor clientInterceptor = new StatsClientInterceptor(); - private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory(); + final Metadata.Key statsHeader; private final boolean propagateTags; - private final boolean recordStats; - CensusStatsModule( - final StatsContextFactory statsCtxFactory, Supplier stopwatchSupplier, - boolean propagateTags, boolean recordStats) { - this.statsCtxFactory = checkNotNull(statsCtxFactory, "statsCtxFactory"); + /** + * Creates a {@link CensusStatsModule} with the default OpenCensus implementation. + */ + CensusStatsModule(Supplier stopwatchSupplier, boolean propagateTags) { + this( + Tags.getTagger(), + Tags.getTagPropagationComponent().getBinarySerializer(), + Stats.getStatsRecorder(), + stopwatchSupplier, + propagateTags); + } + + /** + * Creates a {@link CensusStatsModule} with the given OpenCensus implementation. + */ + public CensusStatsModule( + final Tagger tagger, + final TagContextBinarySerializer tagCtxSerializer, + StatsRecorder statsRecorder, Supplier stopwatchSupplier, + boolean propagateTags) { + this.tagger = checkNotNull(tagger, "tagger"); + this.statsRecorder = checkNotNull(statsRecorder, "statsRecorder"); + checkNotNull(tagCtxSerializer, "tagCtxSerializer"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.propagateTags = propagateTags; - this.recordStats = recordStats; this.statsHeader = - Metadata.Key.of("grpc-tags-bin", new Metadata.BinaryMarshaller() { + Metadata.Key.of("grpc-tags-bin", new Metadata.BinaryMarshaller() { @Override - public byte[] toBytes(StatsContext context) { + public byte[] toBytes(TagContext context) { // TODO(carl-mastrangelo): currently we only make sure the correctness. We may need to // optimize out the allocation and copy in the future. - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); try { - context.serialize(buffer); - } catch (IOException e) { + return tagCtxSerializer.toByteArray(context); + } catch (TagContextSerializationException e) { throw new RuntimeException(e); } - return buffer.toByteArray(); } @Override - public StatsContext parseBytes(byte[] serialized) { + public TagContext parseBytes(byte[] serialized) { try { - return statsCtxFactory.deserialize(new ByteArrayInputStream(serialized)); + return tagCtxSerializer.fromByteArray(serialized); } catch (Exception e) { logger.log(Level.FINE, "Failed to parse stats header", e); - return statsCtxFactory.getDefault(); + return tagger.empty(); } } }); @@ -116,22 +132,23 @@ final class CensusStatsModule { * Creates a {@link ClientCallTracer} for a new call. */ @VisibleForTesting - ClientCallTracer newClientCallTracer(StatsContext parentCtx, String fullMethodName) { - return new ClientCallTracer(this, parentCtx, fullMethodName); + ClientCallTracer newClientCallTracer( + TagContext parentCtx, String fullMethodName, boolean recordStats) { + return new ClientCallTracer(this, parentCtx, fullMethodName, recordStats); } /** * Returns the server tracer factory. */ - ServerStreamTracer.Factory getServerTracerFactory() { - return serverTracerFactory; + ServerStreamTracer.Factory getServerTracerFactory(boolean recordStats) { + return new ServerTracerFactory(recordStats); } /** * Returns the client interceptor that facilitates Census-based stats reporting. */ - ClientInterceptor getClientInterceptor() { - return clientInterceptor; + ClientInterceptor getClientInterceptor(boolean recordStats) { + return new StatsClientInterceptor(recordStats); } private static final class ClientTracer extends ClientStreamTracer { @@ -203,13 +220,19 @@ final class CensusStatsModule { private final Stopwatch stopwatch; private volatile ClientTracer streamTracer; private volatile int callEnded; - private final StatsContext parentCtx; + private final TagContext parentCtx; + private final boolean recordStats; - ClientCallTracer(CensusStatsModule module, StatsContext parentCtx, String fullMethodName) { + ClientCallTracer( + CensusStatsModule module, + TagContext parentCtx, + String fullMethodName, + boolean recordStats) { this.module = module; this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); this.stopwatch = module.stopwatchSupplier.get().start(); + this.recordStats = recordStats; } @Override @@ -222,7 +245,7 @@ final class CensusStatsModule { "Are you creating multiple streams per call? This class doesn't yet support this case."); if (module.propagateTags) { headers.discardAll(module.statsHeader); - if (parentCtx != module.statsCtxFactory.getDefault()) { + if (!module.tagger.empty().equals(parentCtx)) { headers.put(module.statsHeader, parentCtx); } } @@ -239,7 +262,7 @@ final class CensusStatsModule { if (callEndedUpdater.getAndSet(this, 1) != 0) { return; } - if (!module.recordStats) { + if (!recordStats) { return; } stopwatch.stop(); @@ -248,27 +271,29 @@ final class CensusStatsModule { if (tracer == null) { tracer = BLANK_CLIENT_TRACER; } - MeasurementMap.Builder builder = MeasurementMap.builder() + MeasureMap measureMap = module.statsRecorder.newMeasureMap() // The metrics are in double - .put(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, roundtripNanos / NANOS_PER_MILLI) - .put(RpcConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount) - .put(RpcConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount) - .put(RpcConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize) - .put(RpcConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize) + .put(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, roundtripNanos / NANOS_PER_MILLI) + .put(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount) + .put(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount) + .put(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize) + .put(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize) .put( - RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES, + RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES, tracer.outboundUncompressedSize) .put( - RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES, + RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES, tracer.inboundUncompressedSize); if (!status.isOk()) { - builder.put(RpcConstants.RPC_CLIENT_ERROR_COUNT, 1.0); + measureMap.put(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT, 1); } - parentCtx - .with( - RpcConstants.RPC_CLIENT_METHOD, TagValue.create(fullMethodName), - RpcConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) - .record(builder.build()); + measureMap.record( + module + .tagger + .toBuilder(parentCtx) + .put(RpcMeasureConstants.RPC_METHOD, TagValue.create(fullMethodName)) + .put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) + .build()); } } @@ -291,10 +316,11 @@ final class CensusStatsModule { private final CensusStatsModule module; private final String fullMethodName; @Nullable - private final StatsContext parentCtx; + private final TagContext parentCtx; private volatile int streamClosed; private final Stopwatch stopwatch; - private final StatsContextFactory statsCtxFactory; + private final Tagger tagger; + private final boolean recordStats; private volatile long outboundMessageCount; private volatile long inboundMessageCount; private volatile long outboundWireSize; @@ -305,14 +331,16 @@ final class CensusStatsModule { ServerTracer( CensusStatsModule module, String fullMethodName, - StatsContext parentCtx, + TagContext parentCtx, Supplier stopwatchSupplier, - StatsContextFactory statsCtxFactory) { + Tagger tagger, + boolean recordStats) { this.module = module; this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.stopwatch = stopwatchSupplier.get().start(); - this.statsCtxFactory = statsCtxFactory; + this.tagger = tagger; + this.recordStats = recordStats; } @Override @@ -356,33 +384,36 @@ final class CensusStatsModule { if (streamClosedUpdater.getAndSet(this, 1) != 0) { return; } - if (!module.recordStats) { + if (!recordStats) { return; } stopwatch.stop(); long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); - MeasurementMap.Builder builder = MeasurementMap.builder() + MeasureMap measureMap = module.statsRecorder.newMeasureMap() // The metrics are in double - .put(RpcConstants.RPC_SERVER_SERVER_LATENCY, elapsedTimeNanos / NANOS_PER_MILLI) - .put(RpcConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount) - .put(RpcConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount) - .put(RpcConstants.RPC_SERVER_RESPONSE_BYTES, outboundWireSize) - .put(RpcConstants.RPC_SERVER_REQUEST_BYTES, inboundWireSize) - .put(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES, outboundUncompressedSize) - .put(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES, inboundUncompressedSize); + .put(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY, elapsedTimeNanos / NANOS_PER_MILLI) + .put(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount) + .put(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount) + .put(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES, outboundWireSize) + .put(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES, inboundWireSize) + .put(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES, outboundUncompressedSize) + .put(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES, inboundUncompressedSize); if (!status.isOk()) { - builder.put(RpcConstants.RPC_SERVER_ERROR_COUNT, 1.0); + measureMap.put(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT, 1); } - StatsContext ctx = firstNonNull(parentCtx, statsCtxFactory.getDefault()); - ctx - .with(RpcConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) - .record(builder.build()); + TagContext ctx = firstNonNull(parentCtx, tagger.empty()); + measureMap.record( + module + .tagger + .toBuilder(ctx) + .put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) + .build()); } @Override public Context filterContext(Context context) { - if (parentCtx != statsCtxFactory.getDefault()) { - return context.withValue(STATS_CONTEXT_KEY, parentCtx); + if (!tagger.empty().equals(parentCtx)) { + return context.withValue(TAG_CONTEXT_KEY, parentCtx); } return context; } @@ -390,27 +421,48 @@ final class CensusStatsModule { @VisibleForTesting final class ServerTracerFactory extends ServerStreamTracer.Factory { + private final boolean recordStats; + + ServerTracerFactory(boolean recordStats) { + this.recordStats = recordStats; + } + @Override public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) { - StatsContext parentCtx = headers.get(statsHeader); + TagContext parentCtx = headers.get(statsHeader); if (parentCtx == null) { - parentCtx = statsCtxFactory.getDefault(); + parentCtx = tagger.empty(); } - parentCtx = parentCtx.with(RpcConstants.RPC_SERVER_METHOD, TagValue.create(fullMethodName)); + parentCtx = + tagger + .toBuilder(parentCtx) + .put(RpcMeasureConstants.RPC_METHOD, TagValue.create(fullMethodName)) + .build(); return new ServerTracer( - CensusStatsModule.this, fullMethodName, parentCtx, stopwatchSupplier, statsCtxFactory); + CensusStatsModule.this, + fullMethodName, + parentCtx, + stopwatchSupplier, + tagger, + recordStats); } } @VisibleForTesting final class StatsClientInterceptor implements ClientInterceptor { + private final boolean recordStats; + + StatsClientInterceptor(boolean recordStats) { + this.recordStats = recordStats; + } + @Override public ClientCall interceptCall( MethodDescriptor method, CallOptions callOptions, Channel next) { - // New RPCs on client-side inherit the stats context from the current Context. - StatsContext parentCtx = statsCtxFactory.getCurrentStatsContext(); + // New RPCs on client-side inherit the tag context from the current Context. + TagContext parentCtx = tagger.getCurrentTagContext(); final ClientCallTracer tracerFactory = - newClientCallTracer(parentCtx, method.getFullMethodName()); + newClientCallTracer(parentCtx, method.getFullMethodName(), recordStats); ClientCall call = next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory)); return new SimpleForwardingClientCall(call) { diff --git a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java index a874a7b0ec..3ca5db366a 100644 --- a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java @@ -27,8 +27,6 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import com.google.common.util.concurrent.MoreExecutors; -import com.google.instrumentation.stats.StatsContext; -import com.google.instrumentation.stats.StatsContextFactory; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -38,7 +36,9 @@ import io.grpc.DecompressorRegistry; import io.grpc.LoadBalancer; import io.grpc.MethodDescriptor; import io.grpc.NameResolver; -import java.io.InputStream; +import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; +import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer; +import io.grpc.internal.testing.StatsTestUtils.FakeTagger; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; @@ -61,19 +61,6 @@ public class AbstractManagedChannelImplBuilderTest { } }; - private static final StatsContextFactory DUMMY_STATS_FACTORY = - new StatsContextFactory() { - @Override - public StatsContext deserialize(InputStream input) { - throw new UnsupportedOperationException(); - } - - @Override - public StatsContext getDefault() { - throw new UnsupportedOperationException(); - } - }; - private Builder builder = new Builder("fake"); private Builder directAddressBuilder = new Builder(new SocketAddress(){}, "fake"); @@ -346,12 +333,24 @@ public class AbstractManagedChannelImplBuilderTest { static class Builder extends AbstractManagedChannelImplBuilder { Builder(String target) { super(target); - statsContextFactory(DUMMY_STATS_FACTORY); + overrideCensusStatsModule( + new CensusStatsModule( + new FakeTagger(), + new FakeTagContextBinarySerializer(), + new FakeStatsRecorder(), + GrpcUtil.STOPWATCH_SUPPLIER, + true)); } Builder(SocketAddress directServerAddress, String authority) { super(directServerAddress, authority); - statsContextFactory(DUMMY_STATS_FACTORY); + overrideCensusStatsModule( + new CensusStatsModule( + new FakeTagger(), + new FakeTagContextBinarySerializer(), + new FakeStatsRecorder(), + GrpcUtil.STOPWATCH_SUPPLIER, + true)); } @Override diff --git a/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java index fed373a4b4..ed85ac377e 100644 --- a/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java @@ -19,12 +19,12 @@ package io.grpc.internal; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; -import com.google.instrumentation.stats.StatsContext; -import com.google.instrumentation.stats.StatsContextFactory; import io.grpc.Metadata; import io.grpc.ServerStreamTracer; +import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; +import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer; +import io.grpc.internal.testing.StatsTestUtils.FakeTagger; import java.io.File; -import java.io.InputStream; import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; @@ -33,18 +33,6 @@ import org.junit.runners.JUnit4; /** Unit tests for {@link AbstractServerImplBuilder}. */ @RunWith(JUnit4.class) public class AbstractServerImplBuilderTest { - private static final StatsContextFactory DUMMY_STATS_FACTORY = - new StatsContextFactory() { - @Override - public StatsContext deserialize(InputStream input) { - throw new UnsupportedOperationException(); - } - - @Override - public StatsContext getDefault() { - throw new UnsupportedOperationException(); - } - }; private static final ServerStreamTracer.Factory DUMMY_USER_TRACER = new ServerStreamTracer.Factory() { @@ -97,7 +85,13 @@ public class AbstractServerImplBuilderTest { static class Builder extends AbstractServerImplBuilder { Builder() { - statsContextFactory(DUMMY_STATS_FACTORY); + overrideCensusStatsModule( + new CensusStatsModule( + new FakeTagger(), + new FakeTagContextBinarySerializer(), + new FakeStatsRecorder(), + GrpcUtil.STOPWATCH_SUPPLIER, + true)); } @Override diff --git a/core/src/test/java/io/grpc/internal/CensusModulesTest.java b/core/src/test/java/io/grpc/internal/CensusModulesTest.java index 8828db1599..d77b268ce3 100644 --- a/core/src/test/java/io/grpc/internal/CensusModulesTest.java +++ b/core/src/test/java/io/grpc/internal/CensusModulesTest.java @@ -16,7 +16,7 @@ package io.grpc.internal; -import static com.google.instrumentation.stats.ContextUtils.STATS_CONTEXT_KEY; +import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -40,9 +40,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import com.google.instrumentation.stats.RpcConstants; -import com.google.instrumentation.stats.StatsContext; -import com.google.instrumentation.stats.TagValue; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -58,9 +55,15 @@ import io.grpc.ServerServiceDefinition; import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.internal.testing.StatsTestUtils; -import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory; +import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; +import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer; +import io.grpc.internal.testing.StatsTestUtils.FakeTagger; import io.grpc.internal.testing.StatsTestUtils.MockableSpan; import io.grpc.testing.GrpcServerRule; +import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tags; import io.opencensus.trace.EndSpanOptions; import io.opencensus.trace.NetworkEvent; import io.opencensus.trace.NetworkEvent.Type; @@ -71,7 +74,6 @@ import io.opencensus.trace.Tracer; import io.opencensus.trace.propagation.BinaryFormat; import io.opencensus.trace.propagation.SpanContextParseException; import io.opencensus.trace.unsafe.ContextUtils; -import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.List; import java.util.Random; @@ -137,7 +139,10 @@ public class CensusModulesTest { method.toBuilder().setSampledToLocalTracing(true).build(); private final FakeClock fakeClock = new FakeClock(); - private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory(); + private final FakeTagger tagger = new FakeTagger(); + private final FakeTagContextBinarySerializer tagCtxSerializer = + new FakeTagContextBinarySerializer(); + private final FakeStatsRecorder statsRecorder = new FakeStatsRecorder(); private final Random random = new Random(1234); private final Span fakeClientParentSpan = MockableSpan.generateRandomSpan(random); private final Span spyClientSpan = spy(MockableSpan.generateRandomSpan(random)); @@ -185,13 +190,14 @@ public class CensusModulesTest { when(mockTracingPropagationHandler.fromByteArray(any(byte[].class))) .thenReturn(fakeClientSpanContext); censusStats = - new CensusStatsModule(statsCtxFactory, fakeClock.getStopwatchSupplier(), true, true); + new CensusStatsModule( + tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), true); censusTracing = new CensusTracingModule(tracer, mockTracingPropagationHandler); } @After public void wrapUp() { - assertNull(statsCtxFactory.pollRecord()); + assertNull(statsRecorder.pollRecord()); } @Test @@ -204,7 +210,7 @@ public class CensusModulesTest { testClientInterceptors(true); } - // Test that Census ClientInterceptors uses the StatsContext and Span out of the current Context + // Test that Census ClientInterceptors uses the TagContext and Span out of the current Context // to create the ClientCallTracer, and that it intercepts ClientCall.Listener.onClose() to call // ClientCallTracer.callEnded(). private void testClientInterceptors(boolean nonDefaultContext) { @@ -234,14 +240,14 @@ public class CensusModulesTest { Channel interceptedChannel = ClientInterceptors.intercept( grpcServerRule.getChannel(), callOptionsCaptureInterceptor, - censusStats.getClientInterceptor(), censusTracing.getClientInterceptor()); + censusStats.getClientInterceptor(true), censusTracing.getClientInterceptor()); ClientCall call; if (nonDefaultContext) { Context ctx = Context.ROOT.withValues( - STATS_CONTEXT_KEY, - statsCtxFactory.getDefault().with( - StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")), + TAG_CONTEXT_KEY, + tagger.emptyBuilder().put( + StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")).build(), ContextUtils.CONTEXT_SPAN_KEY, fakeClientParentSpan); Context origCtx = ctx.attach(); @@ -251,7 +257,7 @@ public class CensusModulesTest { ctx.detach(origCtx); } } else { - assertNull(STATS_CONTEXT_KEY.get()); + assertEquals(Tags.getTagger().empty(), TAG_CONTEXT_KEY.get()); assertNull(ContextUtils.CONTEXT_SPAN_KEY.get()); call = interceptedChannel.newCall(method, CALL_OPTIONS); } @@ -269,7 +275,7 @@ public class CensusModulesTest { // Make the call Metadata headers = new Metadata(); call.start(mockClientCallListener, headers); - assertNull(statsCtxFactory.pollRecord()); + assertNull(statsRecorder.pollRecord()); if (nonDefaultContext) { verify(tracer).spanBuilderWithExplicitParent( eq("Sent.package1.service2.method3"), same(fakeClientParentSpan)); @@ -291,15 +297,15 @@ public class CensusModulesTest { assertEquals("No you don't", status.getDescription()); // The intercepting listener calls callEnded() on ClientCallTracer, which records to Census. - StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); - TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD); - assertEquals(method.getFullMethodName(), methodTag.toString()); - TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.PERMISSION_DENIED.toString(), statusTag.toString()); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.PERMISSION_DENIED.toString(), statusTag.asString()); if (nonDefaultContext) { TagValue extraTag = record.tags.get(StatsTestUtils.EXTRA_TAG); - assertEquals("extra value", extraTag.toString()); + assertEquals("extra value", extraTag.asString()); } else { assertNull(record.tags.get(StatsTestUtils.EXTRA_TAG)); } @@ -316,7 +322,7 @@ public class CensusModulesTest { @Test public void clientBasicStatsDefaultContext() { CensusStatsModule.ClientCallTracer callTracer = - censusStats.newClientCallTracer(statsCtxFactory.getDefault(), method.getFullMethodName()); + censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName(), true); Metadata headers = new Metadata(); ClientStreamTracer tracer = callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); @@ -343,24 +349,27 @@ public class CensusModulesTest { tracer.streamClosed(Status.OK); callTracer.callEnded(Status.OK); - StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); assertNoServerContent(record); - TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD); - assertEquals(method.getFullMethodName(), methodTag.toString()); - TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.OK.toString(), statusTag.toString()); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT)); - assertEquals(2, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_COUNT)); - assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); - assertEquals(1128 + 865, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); - assertEquals(2, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_COUNT)); - assertEquals(33 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.OK.toString(), statusTag.asString()); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT)); + assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals( + 1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES)); + assertEquals( + 1128 + 865, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT)); + assertEquals( + 33 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES)); assertEquals(67 + 552, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); assertEquals(30 + 100 + 16 + 24, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); } @Test @@ -424,29 +433,30 @@ public class CensusModulesTest { public void clientStreamNeverCreatedStillRecordStats() { CensusStatsModule.ClientCallTracer callTracer = censusStats.newClientCallTracer( - statsCtxFactory.getDefault(), method.getFullMethodName()); + tagger.empty(), method.getFullMethodName(), true); fakeClock.forwardTime(3000, MILLISECONDS); callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds")); - StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); assertNoServerContent(record); - TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD); - assertEquals(method.getFullMethodName(), methodTag.toString()); - TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.toString()); - assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ERROR_COUNT)); - assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_COUNT)); - assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.asString()); + assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT)); + assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES)); assertEquals(0, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); - assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_COUNT)); - assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT)); + assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES)); assertEquals(0, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); - assertEquals(3000, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + assertEquals( + 3000, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); } @Test @@ -492,13 +502,18 @@ public class CensusModulesTest { // EXTRA_TAG is propagated by the FakeStatsContextFactory. Note that not all tags are // propagated. The StatsContextFactory decides which tags are to propagated. gRPC facilitates // the propagation by putting them in the headers. - StatsContext clientCtx = statsCtxFactory.getDefault().with( - StatsTestUtils.EXTRA_TAG, TagValue.create("extra-tag-value-897")); - CensusStatsModule census = new CensusStatsModule( - statsCtxFactory, fakeClock.getStopwatchSupplier(), propagate, recordStats); + TagContext clientCtx = tagger.emptyBuilder().put( + StatsTestUtils.EXTRA_TAG, TagValue.create("extra-tag-value-897")).build(); + CensusStatsModule census = + new CensusStatsModule( + tagger, + tagCtxSerializer, + statsRecorder, + fakeClock.getStopwatchSupplier(), + propagate); Metadata headers = new Metadata(); CensusStatsModule.ClientCallTracer callTracer = - census.newClientCallTracer(clientCtx, method.getFullMethodName()); + census.newClientCallTracer(clientCtx, method.getFullMethodName(), recordStats); // This propagates clientCtx to headers if propagates==true callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); if (propagate) { @@ -509,30 +524,32 @@ public class CensusModulesTest { } ServerStreamTracer serverTracer = - census.getServerTracerFactory().newServerStreamTracer( + census.getServerTracerFactory(recordStats).newServerStreamTracer( method.getFullMethodName(), headers); // Server tracer deserializes clientCtx from the headers, so that it records stats with the // propagated tags. Context serverContext = serverTracer.filterContext(Context.ROOT); // It also put clientCtx in the Context seen by the call handler assertEquals( - clientCtx.with(RpcConstants.RPC_SERVER_METHOD, TagValue.create(method.getFullMethodName())), - STATS_CONTEXT_KEY.get(serverContext)); + tagger.toBuilder(clientCtx).put( + RpcMeasureConstants.RPC_METHOD, + TagValue.create(method.getFullMethodName())).build(), + TAG_CONTEXT_KEY.get(serverContext)); // Verifies that the server tracer records the status with the propagated tag serverTracer.streamClosed(Status.OK); if (recordStats) { - StatsTestUtils.MetricsRecord serverRecord = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord serverRecord = statsRecorder.pollRecord(); assertNotNull(serverRecord); assertNoClientContent(serverRecord); - TagValue serverMethodTag = serverRecord.tags.get(RpcConstants.RPC_SERVER_METHOD); - assertEquals(method.getFullMethodName(), serverMethodTag.toString()); - TagValue serverStatusTag = serverRecord.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.OK.toString(), serverStatusTag.toString()); - assertNull(serverRecord.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT)); + TagValue serverMethodTag = serverRecord.tags.get(RpcMeasureConstants.RPC_METHOD); + assertEquals(method.getFullMethodName(), serverMethodTag.asString()); + TagValue serverStatusTag = serverRecord.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.OK.toString(), serverStatusTag.asString()); + assertNull(serverRecord.getMetric(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT)); TagValue serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG); - assertEquals("extra-tag-value-897", serverPropagatedTag.toString()); + assertEquals("extra-tag-value-897", serverPropagatedTag.asString()); } // Verifies that the client tracer factory uses clientCtx, which includes the custom tags, to @@ -540,27 +557,27 @@ public class CensusModulesTest { callTracer.callEnded(Status.OK); if (recordStats) { - StatsTestUtils.MetricsRecord clientRecord = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord clientRecord = statsRecorder.pollRecord(); assertNotNull(clientRecord); assertNoServerContent(clientRecord); - TagValue clientMethodTag = clientRecord.tags.get(RpcConstants.RPC_CLIENT_METHOD); - assertEquals(method.getFullMethodName(), clientMethodTag.toString()); - TagValue clientStatusTag = clientRecord.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.OK.toString(), clientStatusTag.toString()); - assertNull(clientRecord.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT)); + TagValue clientMethodTag = clientRecord.tags.get(RpcMeasureConstants.RPC_METHOD); + assertEquals(method.getFullMethodName(), clientMethodTag.asString()); + TagValue clientStatusTag = clientRecord.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.OK.toString(), clientStatusTag.asString()); + assertNull(clientRecord.getMetric(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT)); TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG); - assertEquals("extra-tag-value-897", clientPropagatedTag.toString()); + assertEquals("extra-tag-value-897", clientPropagatedTag.asString()); } if (!recordStats) { - assertNull(statsCtxFactory.pollRecord()); + assertNull(statsRecorder.pollRecord()); } } @Test public void statsHeadersNotPropagateDefaultContext() { CensusStatsModule.ClientCallTracer callTracer = - censusStats.newClientCallTracer(statsCtxFactory.getDefault(), method.getFullMethodName()); + censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName(), true); Metadata headers = new Metadata(); callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); assertFalse(headers.containsKey(censusStats.statsHeader)); @@ -573,7 +590,7 @@ public class CensusModulesTest { Metadata.Key arbitraryStatsHeader = Metadata.Key.of("grpc-tags-bin", Metadata.BINARY_BYTE_MARSHALLER); try { - statsCtxFactory.deserialize(new ByteArrayInputStream(statsHeaderValue)); + tagCtxSerializer.fromByteArray(statsHeaderValue); fail("Should have thrown"); } catch (Exception e) { // Expected @@ -583,7 +600,7 @@ public class CensusModulesTest { Metadata headers = new Metadata(); assertNull(headers.get(censusStats.statsHeader)); headers.put(arbitraryStatsHeader, statsHeaderValue); - assertSame(statsCtxFactory.getDefault(), headers.get(censusStats.statsHeader)); + assertSame(tagger.empty(), headers.get(censusStats.statsHeader)); } @Test @@ -641,15 +658,19 @@ public class CensusModulesTest { @Test public void serverBasicStatsNoHeaders() { - ServerStreamTracer.Factory tracerFactory = censusStats.getServerTracerFactory(); + ServerStreamTracer.Factory tracerFactory = censusStats.getServerTracerFactory(true); ServerStreamTracer tracer = tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); Context filteredContext = tracer.filterContext(Context.ROOT); - StatsContext statsCtx = STATS_CONTEXT_KEY.get(filteredContext); + TagContext statsCtx = TAG_CONTEXT_KEY.get(filteredContext); assertEquals( - statsCtxFactory.getDefault() - .with(RpcConstants.RPC_SERVER_METHOD, TagValue.create(method.getFullMethodName())), + tagger + .emptyBuilder() + .put( + RpcMeasureConstants.RPC_METHOD, + TagValue.create(method.getFullMethodName())) + .build(), statsCtx); tracer.inboundMessage(0); @@ -673,24 +694,27 @@ public class CensusModulesTest { tracer.streamClosed(Status.CANCELLED); - StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); assertNoClientContent(record); - TagValue methodTag = record.tags.get(RpcConstants.RPC_SERVER_METHOD); - assertEquals(method.getFullMethodName(), methodTag.toString()); - TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.CANCELLED.toString(), statusTag.toString()); - assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_ERROR_COUNT)); - assertEquals(2, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_COUNT)); - assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_BYTES)); - assertEquals(1128 + 865, - record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); - assertEquals(2, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_COUNT)); - assertEquals(34 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_BYTES)); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.CANCELLED.toString(), statusTag.asString()); + assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT)); + assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT)); + assertEquals( + 1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES)); + assertEquals( + 1128 + 865, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); + assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT)); + assertEquals( + 34 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES)); assertEquals(67 + 552, - record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); assertEquals(100 + 16 + 24, - record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_SERVER_LATENCY)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY)); } @Test @@ -803,27 +827,27 @@ public class CensusModulesTest { } private static void assertNoServerContent(StatsTestUtils.MetricsRecord record) { - assertNull(record.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_ELAPSED_TIME)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_LATENCY)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_SERVER_ELAPSED_TIME)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); } private static void assertNoClientContent(StatsTestUtils.MetricsRecord record) { - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); } private static class FakeServerCall extends ServerCall { diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index 8dff45f687..ce734f4f53 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -17,9 +17,9 @@ package io.grpc.testing.integration; import static com.google.common.truth.Truth.assertThat; -import static com.google.instrumentation.stats.ContextUtils.STATS_CONTEXT_KEY; import static io.grpc.stub.ClientCalls.blockingServerStreamingCall; import static io.grpc.testing.integration.Messages.PayloadType.COMPRESSABLE; +import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY; import static io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -42,10 +42,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.SettableFuture; -import com.google.instrumentation.stats.RpcConstants; -import com.google.instrumentation.stats.StatsContextFactory; -import com.google.instrumentation.stats.TagKey; -import com.google.instrumentation.stats.TagValue; import com.google.protobuf.BoolValue; import com.google.protobuf.ByteString; import com.google.protobuf.EmptyProtos.Empty; @@ -71,12 +67,15 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.auth.MoreCallCredentials; import io.grpc.internal.AbstractServerImplBuilder; +import io.grpc.internal.CensusStatsModule; import io.grpc.internal.GrpcUtil; -import io.grpc.internal.testing.StatsTestUtils.FakeStatsContext; -import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory; +import io.grpc.internal.testing.StatsTestUtils; +import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; +import io.grpc.internal.testing.StatsTestUtils.FakeTagContext; +import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer; +import io.grpc.internal.testing.StatsTestUtils.FakeTagger; import io.grpc.internal.testing.StatsTestUtils.MetricsRecord; import io.grpc.internal.testing.StatsTestUtils.MockableSpan; -import io.grpc.internal.testing.StatsTestUtils; import io.grpc.internal.testing.StreamRecorder; import io.grpc.internal.testing.TestClientStreamTracer; import io.grpc.internal.testing.TestServerStreamTracer; @@ -97,6 +96,9 @@ import io.grpc.testing.integration.Messages.StreamingInputCallRequest; import io.grpc.testing.integration.Messages.StreamingInputCallResponse; import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; +import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; import io.opencensus.trace.Span; import io.opencensus.trace.unsafe.ContextUtils; import java.io.ByteArrayInputStream; @@ -153,10 +155,11 @@ public abstract class AbstractInteropTest { new AtomicReference(); private static ScheduledExecutorService testServiceExecutor; private static Server server; - private static final FakeStatsContextFactory clientStatsCtxFactory = - new FakeStatsContextFactory(); - private static final FakeStatsContextFactory serverStatsCtxFactory = - new FakeStatsContextFactory(); + private static final FakeTagger tagger = new FakeTagger(); + private static final FakeTagContextBinarySerializer tagContextBinarySerializer = + new FakeTagContextBinarySerializer(); + private static final FakeStatsRecorder clientStatsRecorder = new FakeStatsRecorder(); + private static final FakeStatsRecorder serverStatsRecorder = new FakeStatsRecorder(); private static final LinkedBlockingQueue serverStreamTracers = new LinkedBlockingQueue(); @@ -212,7 +215,14 @@ public abstract class AbstractInteropTest { new TestServiceImpl(testServiceExecutor), allInterceptors)) .addStreamTracerFactory(serverStreamTracerFactory); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, serverStatsCtxFactory); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, + new CensusStatsModule( + tagger, + tagContextBinarySerializer, + serverStatsRecorder, + GrpcUtil.STOPWATCH_SUPPLIER, + true)); try { server = builder.build().start(); } catch (IOException ex) { @@ -266,8 +276,8 @@ public abstract class AbstractInteropTest { TestServiceGrpc.newBlockingStub(channel).withInterceptors(tracerSetupInterceptor); asyncStub = TestServiceGrpc.newStub(channel).withInterceptors(tracerSetupInterceptor); requestHeadersCapture.set(null); - clientStatsCtxFactory.rolloverRecords(); - serverStatsCtxFactory.rolloverRecords(); + clientStatsRecorder.rolloverRecords(); + serverStatsRecorder.rolloverRecords(); serverStreamTracers.clear(); } @@ -281,8 +291,9 @@ public abstract class AbstractInteropTest { protected abstract ManagedChannel createChannel(); - protected final StatsContextFactory getClientStatsFactory() { - return clientStatsCtxFactory; + protected final CensusStatsModule createClientCensusStatsModule() { + return new CensusStatsModule( + tagger, tagContextBinarySerializer, clientStatsRecorder, GrpcUtil.STOPWATCH_SUPPLIER, true); } /** @@ -711,10 +722,9 @@ public abstract class AbstractInteropTest { // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be // recorded. The tracer stats rely on the stream being created, which is not always the case // in this test. Therefore we don't check the tracer stats. - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( - clientRecord, false, "grpc.testing.TestService/StreamingInputCall", - Status.CANCELLED.getCode()); + clientRecord, "grpc.testing.TestService/StreamingInputCall", Status.CANCELLED.getCode()); // Do not check server-side metrics, because the status on the server side is undetermined. } } @@ -1034,9 +1044,10 @@ public abstract class AbstractInteropTest { if (metricsExpected()) { // Stream may not have been created before deadline is exceeded, thus we don't test the tracer // stats. - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( - clientRecord, false, "grpc.testing.TestService/StreamingOutputCall", + clientRecord, + "grpc.testing.TestService/StreamingOutputCall", Status.Code.DEADLINE_EXCEEDED); // Do not check server-side metrics, because the status on the server side is undetermined. } @@ -1067,9 +1078,10 @@ public abstract class AbstractInteropTest { if (metricsExpected()) { // Stream may not have been created when deadline is exceeded, thus we don't check tracer // stats. - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( - clientRecord, false, "grpc.testing.TestService/StreamingOutputCall", + clientRecord, + "grpc.testing.TestService/StreamingOutputCall", Status.Code.DEADLINE_EXCEEDED); // Do not check server-side metrics, because the status on the server side is undetermined. } @@ -1091,10 +1103,9 @@ public abstract class AbstractInteropTest { // recorded. The tracer stats rely on the stream being created, which is not the case if // deadline is exceeded before the call is created. Therefore we don't check the tracer stats. if (metricsExpected()) { - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( - clientRecord, false, "grpc.testing.TestService/EmptyCall", - Status.DEADLINE_EXCEEDED.getCode()); + clientRecord, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode()); } // warm up the channel @@ -1109,10 +1120,9 @@ public abstract class AbstractInteropTest { } assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK); if (metricsExpected()) { - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( - clientRecord, false, "grpc.testing.TestService/EmptyCall", - Status.DEADLINE_EXCEEDED.getCode()); + clientRecord, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode()); } } @@ -1397,9 +1407,9 @@ public abstract class AbstractInteropTest { Span clientParentSpan = MockableSpan.generateRandomSpan(new Random()); Context ctx = Context.ROOT.withValues( - STATS_CONTEXT_KEY, - clientStatsCtxFactory.getDefault().with( - StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")), + TAG_CONTEXT_KEY, + tagger.emptyBuilder().put( + StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")).build(), ContextUtils.CONTEXT_SPAN_KEY, clientParentSpan); Context origCtx = ctx.attach(); @@ -1408,7 +1418,7 @@ public abstract class AbstractInteropTest { Context serverCtx = contextCapture.get(); assertNotNull(serverCtx); - FakeStatsContext statsCtx = (FakeStatsContext) STATS_CONTEXT_KEY.get(serverCtx); + FakeTagContext statsCtx = (FakeTagContext) TAG_CONTEXT_KEY.get(serverCtx); assertNotNull(statsCtx); Map tags = statsCtx.getTags(); boolean tagFound = false; @@ -1530,9 +1540,10 @@ public abstract class AbstractInteropTest { // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be // recorded. The tracer stats rely on the stream being created, which is not always the case // in this test, thus we will not check that. - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( - clientRecord, false, "grpc.testing.TestService/FullDuplexCall", + clientRecord, + "grpc.testing.TestService/FullDuplexCall", Status.DEADLINE_EXCEEDED.getCode()); } } @@ -1792,8 +1803,8 @@ public abstract class AbstractInteropTest { if (metricsExpected()) { // CensusStreamTracerModule records final status in interceptor, which is guaranteed to be // done before application receives status. - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(); - checkTags(clientRecord, false, method, code); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(); + checkTags(clientRecord, method, code); if (requests != null && responses != null) { checkCensus(clientRecord, false, requests, responses); @@ -1824,7 +1835,7 @@ public abstract class AbstractInteropTest { try { // On the server, the stats is finalized in ServerStreamListener.closed(), which can be // run after the client receives the final status. So we use a timeout. - serverRecord = serverStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + serverRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -1832,7 +1843,7 @@ public abstract class AbstractInteropTest { break; } try { - checkTags(serverRecord, true, method, code); + checkTags(serverRecord, method, code); if (requests != null && responses != null) { checkCensus(serverRecord, true, requests, responses); } @@ -1889,15 +1900,14 @@ public abstract class AbstractInteropTest { } private static void checkTags( - MetricsRecord record, boolean server, String methodName, Status.Code status) { + MetricsRecord record, String methodName, Status.Code status) { assertNotNull("record is not null", record); - TagValue methodNameTag = record.tags.get( - server ? RpcConstants.RPC_SERVER_METHOD : RpcConstants.RPC_CLIENT_METHOD); + TagValue methodNameTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); assertNotNull("method name tagged", methodNameTag); - assertEquals("method names match", methodName, methodNameTag.toString()); - TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); + assertEquals("method names match", methodName, methodNameTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); assertNotNull("status tagged", statusTag); - assertEquals(status.toString(), statusTag.toString()); + assertEquals(status.toString(), statusTag.asString()); } /** @@ -1950,33 +1960,41 @@ public abstract class AbstractInteropTest { } if (server && serverInProcess()) { assertEquals( - requests.size(), record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_COUNT)); + requests.size(), + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT)); assertEquals( - responses.size(), record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_COUNT)); - assertEquals(uncompressedRequestsSize, - record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); - assertEquals(uncompressedResponsesSize, - record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); - assertNotNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_LATENCY)); + responses.size(), + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT)); + assertEquals( + uncompressedRequestsSize, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals( + uncompressedResponsesSize, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY)); // It's impossible to get the expected wire sizes because it may be compressed, so we just // check if they are recorded. - assertNotNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_BYTES)); - assertNotNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES)); } if (!server) { assertEquals( - requests.size(), record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_COUNT)); + requests.size(), + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT)); assertEquals( - responses.size(), record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_COUNT)); - assertEquals(uncompressedRequestsSize, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); - assertEquals(uncompressedResponsesSize, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); - assertNotNull(record.getMetric(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + responses.size(), + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT)); + assertEquals( + uncompressedRequestsSize, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals( + uncompressedResponsesSize, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); // It's impossible to get the expected wire sizes because it may be compressed, so we just // check if they are recorded. - assertNotNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); - assertNotNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES)); } } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 64bbd9e86e..bbe60cc3ee 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -365,7 +365,8 @@ public class TestServiceClient { } builder = okBuilder; } - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, createClientCensusStatsModule()); return builder.build(); } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java index 50cedf6491..464c970418 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java @@ -48,7 +48,8 @@ public class AutoWindowSizingOnTest extends AbstractInteropTest { NettyChannelBuilder builder = NettyChannelBuilder.forAddress("localhost", getPort()) .negotiationType(NegotiationType.PLAINTEXT) .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, createClientCensusStatsModule()); return builder.build(); } } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java index ff4f0989d8..8076a96cf9 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java @@ -59,7 +59,8 @@ public class Http2NettyLocalChannelTest extends AbstractInteropTest { .channelType(LocalChannel.class) .flowControlWindow(65 * 1024) .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, createClientCensusStatsModule()); return builder.build(); } } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java index f23f7e775e..8bbaba6ec8 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java @@ -80,7 +80,8 @@ public class Http2NettyTest extends AbstractInteropTest { .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE) .sslProvider(SslProvider.OPENSSL) .build()); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, createClientCensusStatsModule()); return builder.build(); } catch (Exception ex) { throw new RuntimeException(ex); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java index 083c6ead49..f2850a0f2d 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java @@ -105,7 +105,8 @@ public class Http2OkHttpTest extends AbstractInteropTest { .build()) .overrideAuthority(GrpcUtil.authorityFromHostAndPort( TestUtils.TEST_SERVER_HOST, getPort())); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, createClientCensusStatsModule()); try { builder.sslSocketFactory(TestUtils.newSslSocketFactoryForCa(Platform.get().getProvider(), TestUtils.loadCert("ca.pem"))); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java index 786ed46bda..e7db3a6fcc 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java @@ -44,7 +44,8 @@ public class InProcessTest extends AbstractInteropTest { @Override protected ManagedChannel createChannel() { InProcessChannelBuilder builder = InProcessChannelBuilder.forName(SERVER_NAME); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, createClientCensusStatsModule()); return builder.build(); } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java index 4034d5b962..7cca4a8b06 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java @@ -172,7 +172,8 @@ public class TransportCompressionTest extends AbstractInteropTest { } }) .usePlaintext(true); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, createClientCensusStatsModule()); return builder.build(); } diff --git a/testing/src/main/java/io/grpc/internal/TestingAccessor.java b/testing/src/main/java/io/grpc/internal/TestingAccessor.java index 71f3f5d734..d2df039a78 100644 --- a/testing/src/main/java/io/grpc/internal/TestingAccessor.java +++ b/testing/src/main/java/io/grpc/internal/TestingAccessor.java @@ -16,26 +16,24 @@ package io.grpc.internal; -import com.google.instrumentation.stats.StatsContextFactory; - /** * Test helper that allows accessing package-private stuff. */ public final class TestingAccessor { /** - * Sets a custom {@link StatsContextFactory} for tests. + * Sets a custom stats implementation for tests. */ - public static void setStatsContextFactory( - AbstractManagedChannelImplBuilder builder, StatsContextFactory factory) { - builder.statsContextFactory(factory); + public static void setStatsImplementation( + AbstractManagedChannelImplBuilder builder, CensusStatsModule censusStats) { + builder.overrideCensusStatsModule(censusStats); } /** - * Sets a custom {@link StatsContextFactory} for tests. + * Sets a custom stats implementation for tests. */ - public static void setStatsContextFactory( - AbstractServerImplBuilder builder, StatsContextFactory factory) { - builder.statsContextFactory(factory); + public static void setStatsImplementation( + AbstractServerImplBuilder builder, CensusStatsModule censusStats) { + builder.overrideCensusStatsModule(censusStats); } private TestingAccessor() { diff --git a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java index 441bef3c58..77fc9405da 100644 --- a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java +++ b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java @@ -19,15 +19,23 @@ package io.grpc.internal.testing; import static com.google.common.base.Charsets.UTF_8; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; -import com.google.instrumentation.stats.MeasurementDescriptor; -import com.google.instrumentation.stats.MeasurementMap; -import com.google.instrumentation.stats.MeasurementValue; -import com.google.instrumentation.stats.StatsContext; -import com.google.instrumentation.stats.StatsContextFactory; -import com.google.instrumentation.stats.TagKey; -import com.google.instrumentation.stats.TagValue; -import io.grpc.internal.IoUtils; +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; +import io.opencensus.common.Scope; +import io.opencensus.stats.Measure; +import io.opencensus.stats.MeasureMap; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.Tag; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagContextBuilder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.propagation.TagContextBinarySerializer; +import io.opencensus.tags.propagation.TagContextDeserializationException; +import io.opencensus.tags.unsafe.ContextUtils; import io.opencensus.trace.Annotation; import io.opencensus.trace.AttributeValue; import io.opencensus.trace.EndSpanOptions; @@ -40,10 +48,8 @@ import io.opencensus.trace.SpanContext; import io.opencensus.trace.SpanId; import io.opencensus.trace.TraceId; import io.opencensus.trace.TraceOptions; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.EnumSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; @@ -57,10 +63,12 @@ public class StatsTestUtils { } public static class MetricsRecord { - public final ImmutableMap tags; - public final MeasurementMap metrics; - private MetricsRecord(ImmutableMap tags, MeasurementMap metrics) { + public final ImmutableMap tags; + public final ImmutableMap metrics; + + private MetricsRecord( + ImmutableMap tags, ImmutableMap metrics) { this.tags = tags; this.metrics = metrics; } @@ -69,10 +77,16 @@ public class StatsTestUtils { * Returns the value of a metric, or {@code null} if not found. */ @Nullable - public Double getMetric(MeasurementDescriptor metricName) { - for (MeasurementValue m : metrics) { - if (m.getMeasurement().equals(metricName)) { - return m.getValue(); + public Double getMetric(Measure measure) { + for (Map.Entry m : metrics.entrySet()) { + if (m.getKey().equals(measure)) { + Number value = m.getValue(); + if (value instanceof Double) { + return (Double) value; + } else if (value instanceof Long) { + return (double) (Long) value; + } + throw new AssertionError("Unexpected measure value type: " + value.getClass().getName()); } } return null; @@ -81,9 +95,9 @@ public class StatsTestUtils { /** * Returns the value of a metric converted to long, or throw if not found. */ - public long getMetricAsLongOrFail(MeasurementDescriptor metricName) { - Double doubleValue = getMetric(metricName); - checkNotNull(doubleValue, "Metric not found: %s", metricName.toString()); + public long getMetricAsLongOrFail(Measure measure) { + Double doubleValue = getMetric(measure); + checkNotNull(doubleValue, "Measure not found: %s", measure.getName()); long longValue = (long) (Math.abs(doubleValue) + 0.0001); if (doubleValue < 0) { longValue = -longValue; @@ -93,39 +107,28 @@ public class StatsTestUtils { } /** - * This tag will be propagated by {@link FakeStatsContextFactory} on the wire. + * This tag will be propagated by {@link FakeTagger} on the wire. */ public static final TagKey EXTRA_TAG = TagKey.create("/rpc/test/extratag"); private static final String EXTRA_TAG_HEADER_VALUE_PREFIX = "extratag:"; - private static final String NO_EXTRA_TAG_HEADER_VALUE_PREFIX = "noextratag"; /** - * A factory that makes fake {@link StatsContext}s and saves the created contexts to be - * accessible from {@link #pollContextOrFail}. The contexts it has created would save metrics - * records to be accessible from {@link #pollRecord()} and {@link #pollRecord(long, TimeUnit)}, - * until {@link #rolloverRecords} is called. + * A {@link Tagger} implementation that saves metrics records to be accessible from {@link + * #pollRecord()} and {@link #pollRecord(long, TimeUnit)}, until {@link #rolloverRecords} is + * called. */ - public static final class FakeStatsContextFactory extends StatsContextFactory { - private BlockingQueue records; - public final BlockingQueue contexts = - new LinkedBlockingQueue(); - private final FakeStatsContext defaultContext; + public static final class FakeStatsRecorder extends StatsRecorder { - /** - * Constructor. - */ - public FakeStatsContextFactory() { - rolloverRecords(); - defaultContext = new FakeStatsContext(ImmutableMap.of(), this); - // The records on the default context is not visible from pollRecord(), just like it's - // not visible from pollContextOrFail() either. + private BlockingQueue records; + + public FakeStatsRecorder() { rolloverRecords(); } - public StatsContext pollContextOrFail() { - StatsContext cc = contexts.poll(); - return checkNotNull(cc); + @Override + public MeasureMap newMeasureMap() { + return new FakeStatsRecord(this); } public MetricsRecord pollRecord() { @@ -136,31 +139,8 @@ public class StatsTestUtils { return getCurrentRecordSink().poll(timeout, unit); } - @Override - public StatsContext deserialize(InputStream buffer) throws IOException { - String serializedString; - try { - serializedString = new String(IoUtils.toByteArray(buffer), UTF_8); - } catch (IOException e) { - throw new RuntimeException(e); - } - if (serializedString.startsWith(EXTRA_TAG_HEADER_VALUE_PREFIX)) { - return getDefault().with(EXTRA_TAG, - TagValue.create(serializedString.substring(EXTRA_TAG_HEADER_VALUE_PREFIX.length()))); - } else if (serializedString.startsWith(NO_EXTRA_TAG_HEADER_VALUE_PREFIX)) { - return getDefault(); - } else { - throw new IOException("Malformed value"); - } - } - - @Override - public FakeStatsContext getDefault() { - return defaultContext; - } - /** - * Disconnect this factory with the contexts it has created so far. The records from those + * Disconnect this tagger with the contexts it has created so far. The records from those * contexts will not show up in {@link #pollRecord}. Useful for isolating the records between * test cases. */ @@ -174,45 +154,111 @@ public class StatsTestUtils { } } - public static final class FakeStatsContext extends StatsContext { - private final ImmutableMap tags; - private final FakeStatsContextFactory factory; + public static final class FakeTagger extends Tagger { + + @Override + public FakeTagContext empty() { + return FakeTagContext.EMPTY; + } + + @Override + public TagContext getCurrentTagContext() { + return ContextUtils.TAG_CONTEXT_KEY.get(); + } + + @Override + public TagContextBuilder emptyBuilder() { + return new FakeTagContextBuilder(ImmutableMap.of()); + } + + @Override + public FakeTagContextBuilder toBuilder(TagContext tags) { + return new FakeTagContextBuilder(getTags(tags)); + } + + @Override + public TagContextBuilder currentBuilder() { + throw new UnsupportedOperationException(); + } + + @Override + public Scope withTagContext(TagContext tags) { + throw new UnsupportedOperationException(); + } + } + + public static final class FakeTagContextBinarySerializer extends TagContextBinarySerializer { + + private final FakeTagger tagger = new FakeTagger(); + + @Override + public TagContext fromByteArray(byte[] bytes) throws TagContextDeserializationException { + String serializedString = new String(bytes, UTF_8); + if (serializedString.startsWith(EXTRA_TAG_HEADER_VALUE_PREFIX)) { + return tagger.emptyBuilder() + .put(EXTRA_TAG, + TagValue.create(serializedString.substring(EXTRA_TAG_HEADER_VALUE_PREFIX.length()))) + .build(); + } else { + throw new TagContextDeserializationException("Malformed value"); + } + } + + @Override + public byte[] toByteArray(TagContext tags) { + TagValue extraTagValue = getTags(tags).get(EXTRA_TAG); + if (extraTagValue == null) { + throw new UnsupportedOperationException("TagContext must contain EXTRA_TAG"); + } + return (EXTRA_TAG_HEADER_VALUE_PREFIX + extraTagValue.asString()).getBytes(UTF_8); + } + } + + public static final class FakeStatsRecord extends MeasureMap { + private final BlockingQueue recordSink; + public final Map metrics = Maps.newHashMap(); - private FakeStatsContext(ImmutableMap tags, - FakeStatsContextFactory factory) { - this.tags = tags; - this.factory = factory; - this.recordSink = factory.getCurrentRecordSink(); - } - - public Map getTags() { - return tags; + private FakeStatsRecord(FakeStatsRecorder statsRecorder) { + this.recordSink = statsRecorder.getCurrentRecordSink(); } @Override - public Builder builder() { - return new FakeStatsContextBuilder(this); - } - - @Override - public StatsContext record(MeasurementMap metrics) { - recordSink.add(new MetricsRecord(tags, metrics)); + public MeasureMap put(Measure.MeasureDouble measure, double value) { + metrics.put(measure, value); return this; } @Override - public void serialize(OutputStream os) { - TagValue extraTagValue = tags.get(EXTRA_TAG); - try { - if (extraTagValue == null) { - os.write(NO_EXTRA_TAG_HEADER_VALUE_PREFIX.getBytes(UTF_8)); - } else { - os.write((EXTRA_TAG_HEADER_VALUE_PREFIX + extraTagValue.toString()).getBytes(UTF_8)); - } - } catch (IOException e) { - throw new RuntimeException(e); - } + public MeasureMap put(Measure.MeasureLong measure, long value) { + metrics.put(measure, value); + return this; + } + + @Override + public void record(TagContext tags) { + recordSink.add(new MetricsRecord(getTags(tags), ImmutableMap.copyOf(metrics))); + } + + @Override + public void record() { + throw new UnsupportedOperationException(); + } + } + + public static final class FakeTagContext extends TagContext { + + private static final FakeTagContext EMPTY = + new FakeTagContext(ImmutableMap.of()); + + private final ImmutableMap tags; + + private FakeTagContext(ImmutableMap tags) { + this.tags = tags; + } + + public ImmutableMap getTags() { + return tags; } @Override @@ -221,41 +267,55 @@ public class StatsTestUtils { } @Override - public boolean equals(Object other) { - if (!(other instanceof FakeStatsContext)) { - return false; - } - FakeStatsContext otherCtx = (FakeStatsContext) other; - return tags.equals(otherCtx.tags); - } - - @Override - public int hashCode() { - return tags.hashCode(); + protected Iterator getIterator() { + return Iterators.transform( + tags.entrySet().iterator(), + new Function, Tag>() { + @Override + public Tag apply(@Nullable Map.Entry entry) { + return Tag.create(entry.getKey(), entry.getValue()); + } + }); } } - private static class FakeStatsContextBuilder extends StatsContext.Builder { - private final ImmutableMap.Builder tagsBuilder = ImmutableMap.builder(); - private final FakeStatsContext base; + public static class FakeTagContextBuilder extends TagContextBuilder { - private FakeStatsContextBuilder(FakeStatsContext base) { - this.base = base; - tagsBuilder.putAll(base.tags); + private final Map tagsBuilder = Maps.newHashMap(); + + private FakeTagContextBuilder(Map tags) { + tagsBuilder.putAll(tags); } @Override - public StatsContext.Builder set(TagKey key, TagValue value) { + public TagContextBuilder put(TagKey key, TagValue value) { tagsBuilder.put(key, value); return this; } @Override - public StatsContext build() { - FakeStatsContext context = new FakeStatsContext(tagsBuilder.build(), base.factory); - base.factory.contexts.add(context); + public TagContextBuilder remove(TagKey key) { + tagsBuilder.remove(key); + return this; + } + + @Override + public TagContext build() { + FakeTagContext context = new FakeTagContext(ImmutableMap.copyOf(tagsBuilder)); return context; } + + @Override + public Scope buildScoped() { + throw new UnsupportedOperationException(); + } + } + + // This method handles the default TagContext, which isn't an instance of FakeTagContext. + private static ImmutableMap getTags(TagContext tags) { + return tags instanceof FakeTagContext + ? ((FakeTagContext) tags).getTags() + : ImmutableMap.of(); } // TODO(bdrutu): Remove this class after OpenCensus releases support for this class.