mirror of https://github.com/grpc/grpc-java.git
core,benchmarks: use Atomics for StatsTraceContext
This removes a needless warning, and isn't much slower. Also this includes a benchmark for StatsTraceContext to measure the overhead for creation. It adds about 40ns per RPC. Optimization will come after structural changes are made to break the dependency on Census.
This commit is contained in:
parent
48a32fbeaa
commit
7a73bf1068
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Copyright 2017, Google Inc. All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
*
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
package io.grpc.internal;
|
||||
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.base.Supplier;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||
import org.openjdk.jmh.annotations.Mode;
|
||||
import org.openjdk.jmh.annotations.OutputTimeUnit;
|
||||
import org.openjdk.jmh.annotations.Scope;
|
||||
import org.openjdk.jmh.annotations.State;
|
||||
|
||||
/**
|
||||
* Benchmark for {@link StatsTraceContext}.
|
||||
*/
|
||||
@State(Scope.Benchmark)
|
||||
public class StatsTraceContextBenchmark {
|
||||
|
||||
private final String methodName = MethodDescriptor.generateFullMethodName("service", "method");
|
||||
|
||||
private final Supplier<Stopwatch> stopWatches = new Supplier<Stopwatch>() {
|
||||
|
||||
@Override
|
||||
public Stopwatch get() {
|
||||
return Stopwatch.createUnstarted();
|
||||
}
|
||||
};
|
||||
|
||||
private final Metadata emptyMetadata = new Metadata();
|
||||
|
||||
/**
|
||||
* Javadoc comment.
|
||||
*/
|
||||
@Benchmark
|
||||
@BenchmarkMode(Mode.SampleTime)
|
||||
@OutputTimeUnit(TimeUnit.NANOSECONDS)
|
||||
public StatsTraceContext newClientContext() {
|
||||
return StatsTraceContext.newClientContext(
|
||||
methodName, NoopStatsContextFactory.INSTANCE, stopWatches);
|
||||
}
|
||||
|
||||
/**
|
||||
* Javadoc comment.
|
||||
*/
|
||||
@Benchmark
|
||||
@BenchmarkMode(Mode.SampleTime)
|
||||
@OutputTimeUnit(TimeUnit.NANOSECONDS)
|
||||
public StatsTraceContext newServerContext_empty() {
|
||||
return StatsTraceContext.newServerContext(
|
||||
methodName, NoopStatsContextFactory.INSTANCE, emptyMetadata, stopWatches);
|
||||
}
|
||||
}
|
|
@ -49,6 +49,7 @@ import java.io.ByteArrayOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* The stats and tracing information for a call.
|
||||
|
@ -59,13 +60,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
* #wireBytesReceived} and {@link #wireBytesSent} can be called concurrently. {@link #callEnded}
|
||||
* can be called concurrently with itself and the other methods.
|
||||
*/
|
||||
@SuppressWarnings("NonAtomicVolatileUpdate")
|
||||
public final class StatsTraceContext {
|
||||
public static final StatsTraceContext NOOP = StatsTraceContext.newClientContext(
|
||||
"noopservice/noopmethod", NoopStatsContextFactory.INSTANCE,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
|
||||
private static final double NANOS_PER_MILLI = 1000 * 1000;
|
||||
private static final double NANOS_PER_MILLI = TimeUnit.MILLISECONDS.toNanos(1);
|
||||
private static final long UNSET_CLIENT_PENDING_NANOS = -1;
|
||||
|
||||
private enum Side {
|
||||
CLIENT, SERVER
|
||||
|
@ -75,11 +76,11 @@ public final class StatsTraceContext {
|
|||
private final Stopwatch stopwatch;
|
||||
private final Side side;
|
||||
private final Metadata.Key<StatsContext> statsHeader;
|
||||
private volatile long clientPendingNanos = -1;
|
||||
private volatile long wireBytesSent;
|
||||
private volatile long wireBytesReceived;
|
||||
private volatile long uncompressedBytesSent;
|
||||
private volatile long uncompressedBytesReceived;
|
||||
private final AtomicLong clientPendingNanos = new AtomicLong(UNSET_CLIENT_PENDING_NANOS);
|
||||
private final AtomicLong wireBytesSent = new AtomicLong();
|
||||
private final AtomicLong wireBytesReceived = new AtomicLong();
|
||||
private final AtomicLong uncompressedBytesSent = new AtomicLong();
|
||||
private final AtomicLong uncompressedBytesReceived = new AtomicLong();
|
||||
private final AtomicBoolean callEnded = new AtomicBoolean(false);
|
||||
|
||||
private StatsTraceContext(Side side, String fullMethodName, StatsContext parentCtx,
|
||||
|
@ -180,14 +181,14 @@ public final class StatsTraceContext {
|
|||
* Record the outgoing number of payload bytes as on the wire.
|
||||
*/
|
||||
void wireBytesSent(long bytes) {
|
||||
wireBytesSent += bytes;
|
||||
wireBytesSent.addAndGet(bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Record the incoming number of payload bytes as on the wire.
|
||||
*/
|
||||
void wireBytesReceived(long bytes) {
|
||||
wireBytesReceived += bytes;
|
||||
wireBytesReceived.addAndGet(bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -196,7 +197,7 @@ public final class StatsTraceContext {
|
|||
* <p>The time this method is called is unrelated to the actual time when those byte are sent.
|
||||
*/
|
||||
void uncompressedBytesSent(long bytes) {
|
||||
uncompressedBytesSent += bytes;
|
||||
uncompressedBytesSent.addAndGet(bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -205,7 +206,7 @@ public final class StatsTraceContext {
|
|||
* <p>The time this method is called is unrelated to the actual time when those byte are received.
|
||||
*/
|
||||
void uncompressedBytesReceived(long bytes) {
|
||||
uncompressedBytesReceived += bytes;
|
||||
uncompressedBytesReceived.addAndGet(bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -215,8 +216,9 @@ public final class StatsTraceContext {
|
|||
*/
|
||||
public void clientHeadersSent() {
|
||||
Preconditions.checkState(side == Side.CLIENT, "Must be called on client-side");
|
||||
if (clientPendingNanos < 0) {
|
||||
clientPendingNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
|
||||
if (clientPendingNanos.get() == UNSET_CLIENT_PENDING_NANOS) {
|
||||
clientPendingNanos.compareAndSet(
|
||||
UNSET_CLIENT_PENDING_NANOS, stopwatch.elapsed(TimeUnit.NANOSECONDS));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -255,18 +257,19 @@ public final class StatsTraceContext {
|
|||
long roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
|
||||
MeasurementMap.Builder builder = MeasurementMap.builder()
|
||||
.put(latencyMetric, roundtripNanos / NANOS_PER_MILLI) // in double
|
||||
.put(wireBytesSentMetric, wireBytesSent)
|
||||
.put(wireBytesReceivedMetric, wireBytesReceived)
|
||||
.put(uncompressedBytesSentMetric, uncompressedBytesSent)
|
||||
.put(uncompressedBytesReceivedMetric, uncompressedBytesReceived);
|
||||
.put(wireBytesSentMetric, wireBytesSent.get())
|
||||
.put(wireBytesReceivedMetric, wireBytesReceived.get())
|
||||
.put(uncompressedBytesSentMetric, uncompressedBytesSent.get())
|
||||
.put(uncompressedBytesReceivedMetric, uncompressedBytesReceived.get());
|
||||
if (!status.isOk()) {
|
||||
builder.put(errorCountMetric, 1.0);
|
||||
}
|
||||
if (side == Side.CLIENT) {
|
||||
if (clientPendingNanos >= 0) {
|
||||
long localClientPendingNanos = clientPendingNanos.get();
|
||||
if (localClientPendingNanos != UNSET_CLIENT_PENDING_NANOS) {
|
||||
builder.put(
|
||||
RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME,
|
||||
(roundtripNanos - clientPendingNanos) / NANOS_PER_MILLI); // in double
|
||||
(roundtripNanos - localClientPendingNanos) / NANOS_PER_MILLI); // in double
|
||||
}
|
||||
}
|
||||
statsCtx.with(RpcConstants.RPC_STATUS, TagValue.create(status.getCode().toString()))
|
||||
|
|
Loading…
Reference in New Issue