diff --git a/benchmarks/src/jmh/java/io/grpc/internal/StatsTraceContextBenchmark.java b/benchmarks/src/jmh/java/io/grpc/internal/StatsTraceContextBenchmark.java new file mode 100644 index 0000000000..2a4bcdb0e2 --- /dev/null +++ b/benchmarks/src/jmh/java/io/grpc/internal/StatsTraceContextBenchmark.java @@ -0,0 +1,85 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; + +/** + * Benchmark for {@link StatsTraceContext}. + */ +@State(Scope.Benchmark) +public class StatsTraceContextBenchmark { + + private final String methodName = MethodDescriptor.generateFullMethodName("service", "method"); + + private final Supplier stopWatches = new Supplier() { + + @Override + public Stopwatch get() { + return Stopwatch.createUnstarted(); + } + }; + + private final Metadata emptyMetadata = new Metadata(); + + /** + * Javadoc comment. + */ + @Benchmark + @BenchmarkMode(Mode.SampleTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public StatsTraceContext newClientContext() { + return StatsTraceContext.newClientContext( + methodName, NoopStatsContextFactory.INSTANCE, stopWatches); + } + + /** + * Javadoc comment. + */ + @Benchmark + @BenchmarkMode(Mode.SampleTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public StatsTraceContext newServerContext_empty() { + return StatsTraceContext.newServerContext( + methodName, NoopStatsContextFactory.INSTANCE, emptyMetadata, stopWatches); + } +} diff --git a/core/src/main/java/io/grpc/internal/StatsTraceContext.java b/core/src/main/java/io/grpc/internal/StatsTraceContext.java index 43a793da82..df949b4255 100644 --- a/core/src/main/java/io/grpc/internal/StatsTraceContext.java +++ b/core/src/main/java/io/grpc/internal/StatsTraceContext.java @@ -49,6 +49,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; /** * The stats and tracing information for a call. @@ -59,13 +60,13 @@ import java.util.concurrent.atomic.AtomicBoolean; * #wireBytesReceived} and {@link #wireBytesSent} can be called concurrently. {@link #callEnded} * can be called concurrently with itself and the other methods. */ -@SuppressWarnings("NonAtomicVolatileUpdate") public final class StatsTraceContext { public static final StatsTraceContext NOOP = StatsTraceContext.newClientContext( "noopservice/noopmethod", NoopStatsContextFactory.INSTANCE, GrpcUtil.STOPWATCH_SUPPLIER); - private static final double NANOS_PER_MILLI = 1000 * 1000; + private static final double NANOS_PER_MILLI = TimeUnit.MILLISECONDS.toNanos(1); + private static final long UNSET_CLIENT_PENDING_NANOS = -1; private enum Side { CLIENT, SERVER @@ -75,11 +76,11 @@ public final class StatsTraceContext { private final Stopwatch stopwatch; private final Side side; private final Metadata.Key statsHeader; - private volatile long clientPendingNanos = -1; - private volatile long wireBytesSent; - private volatile long wireBytesReceived; - private volatile long uncompressedBytesSent; - private volatile long uncompressedBytesReceived; + private final AtomicLong clientPendingNanos = new AtomicLong(UNSET_CLIENT_PENDING_NANOS); + private final AtomicLong wireBytesSent = new AtomicLong(); + private final AtomicLong wireBytesReceived = new AtomicLong(); + private final AtomicLong uncompressedBytesSent = new AtomicLong(); + private final AtomicLong uncompressedBytesReceived = new AtomicLong(); private final AtomicBoolean callEnded = new AtomicBoolean(false); private StatsTraceContext(Side side, String fullMethodName, StatsContext parentCtx, @@ -180,14 +181,14 @@ public final class StatsTraceContext { * Record the outgoing number of payload bytes as on the wire. */ void wireBytesSent(long bytes) { - wireBytesSent += bytes; + wireBytesSent.addAndGet(bytes); } /** * Record the incoming number of payload bytes as on the wire. */ void wireBytesReceived(long bytes) { - wireBytesReceived += bytes; + wireBytesReceived.addAndGet(bytes); } /** @@ -196,7 +197,7 @@ public final class StatsTraceContext { *

The time this method is called is unrelated to the actual time when those byte are sent. */ void uncompressedBytesSent(long bytes) { - uncompressedBytesSent += bytes; + uncompressedBytesSent.addAndGet(bytes); } /** @@ -205,7 +206,7 @@ public final class StatsTraceContext { *

The time this method is called is unrelated to the actual time when those byte are received. */ void uncompressedBytesReceived(long bytes) { - uncompressedBytesReceived += bytes; + uncompressedBytesReceived.addAndGet(bytes); } /** @@ -215,8 +216,9 @@ public final class StatsTraceContext { */ public void clientHeadersSent() { Preconditions.checkState(side == Side.CLIENT, "Must be called on client-side"); - if (clientPendingNanos < 0) { - clientPendingNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); + if (clientPendingNanos.get() == UNSET_CLIENT_PENDING_NANOS) { + clientPendingNanos.compareAndSet( + UNSET_CLIENT_PENDING_NANOS, stopwatch.elapsed(TimeUnit.NANOSECONDS)); } } @@ -255,18 +257,19 @@ public final class StatsTraceContext { long roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); MeasurementMap.Builder builder = MeasurementMap.builder() .put(latencyMetric, roundtripNanos / NANOS_PER_MILLI) // in double - .put(wireBytesSentMetric, wireBytesSent) - .put(wireBytesReceivedMetric, wireBytesReceived) - .put(uncompressedBytesSentMetric, uncompressedBytesSent) - .put(uncompressedBytesReceivedMetric, uncompressedBytesReceived); + .put(wireBytesSentMetric, wireBytesSent.get()) + .put(wireBytesReceivedMetric, wireBytesReceived.get()) + .put(uncompressedBytesSentMetric, uncompressedBytesSent.get()) + .put(uncompressedBytesReceivedMetric, uncompressedBytesReceived.get()); if (!status.isOk()) { builder.put(errorCountMetric, 1.0); } if (side == Side.CLIENT) { - if (clientPendingNanos >= 0) { + long localClientPendingNanos = clientPendingNanos.get(); + if (localClientPendingNanos != UNSET_CLIENT_PENDING_NANOS) { builder.put( RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME, - (roundtripNanos - clientPendingNanos) / NANOS_PER_MILLI); // in double + (roundtripNanos - localClientPendingNanos) / NANOS_PER_MILLI); // in double } } statsCtx.with(RpcConstants.RPC_STATUS, TagValue.create(status.getCode().toString()))