mirror of https://github.com/grpc/grpc-java.git
core: upgrade census (now named instrumentation) to 0.3.0 (#2565)
In upstream, Census is renamed to "Instrumentation". `com.google.census` is renamed to `com.google.instrumentation.stats`. In gRPC, "census" in every name is replaced by "stats".
This commit is contained in:
parent
322eb8c5c5
commit
cce8eac56d
|
@ -151,7 +151,7 @@ subprojects {
|
|||
google_auth_credentials: 'com.google.auth:google-auth-library-credentials:0.4.0',
|
||||
okhttp: 'com.squareup.okhttp:okhttp:2.5.0',
|
||||
okio: 'com.squareup.okio:okio:1.6.0',
|
||||
census_api: 'com.google.census:census-api:0.2.0',
|
||||
instrumentation_api: 'com.google.instrumentation:instrumentation-api:0.3.0',
|
||||
protobuf: "com.google.protobuf:protobuf-java:${protobufVersion}",
|
||||
// swap to ${protobufVersion} after versions align again
|
||||
protobuf_lite: "com.google.protobuf:protobuf-lite:3.0.1",
|
||||
|
|
|
@ -9,7 +9,7 @@ dependencies {
|
|||
libraries.errorprone,
|
||||
libraries.jsr305,
|
||||
project(':grpc-context'),
|
||||
libraries.census_api
|
||||
libraries.instrumentation_api
|
||||
testCompile project(':grpc-testing')
|
||||
}
|
||||
|
||||
|
|
|
@ -31,15 +31,15 @@
|
|||
|
||||
package io.grpc.inprocess;
|
||||
|
||||
import com.google.census.CensusContextFactory;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.instrumentation.stats.StatsContextFactory;
|
||||
|
||||
import io.grpc.ExperimentalApi;
|
||||
import io.grpc.Internal;
|
||||
import io.grpc.internal.AbstractManagedChannelImplBuilder;
|
||||
import io.grpc.internal.ClientTransportFactory;
|
||||
import io.grpc.internal.ConnectionClientTransport;
|
||||
import io.grpc.internal.NoopCensusContextFactory;
|
||||
import io.grpc.internal.NoopStatsContextFactory;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
|
||||
|
@ -68,9 +68,9 @@ public class InProcessChannelBuilder extends
|
|||
super(new InProcessSocketAddress(name), "localhost");
|
||||
this.name = Preconditions.checkNotNull(name, "name");
|
||||
// TODO(zhangkun83): InProcessTransport by-passes framer and deframer, thus message sizses are
|
||||
// not counted. Therefore, we disable Census for now.
|
||||
// not counted. Therefore, we disable stats for now.
|
||||
// (https://github.com/grpc/grpc-java/issues/2284)
|
||||
super.censusContextFactory(NoopCensusContextFactory.INSTANCE);
|
||||
super.statsContextFactory(NoopStatsContextFactory.INSTANCE);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -94,9 +94,9 @@ public class InProcessChannelBuilder extends
|
|||
|
||||
@Internal
|
||||
@Override
|
||||
public InProcessChannelBuilder censusContextFactory(CensusContextFactory censusFactory) {
|
||||
public InProcessChannelBuilder statsContextFactory(StatsContextFactory statsFactory) {
|
||||
// TODO(zhangkun83): InProcessTransport by-passes framer and deframer, thus message sizses are
|
||||
// not counted. Census is disabled by using a NOOP Census factory in the constructor, and here
|
||||
// not counted. Stats is disabled by using a NOOP stats factory in the constructor, and here
|
||||
// we prevent the user from overriding it.
|
||||
// (https://github.com/grpc/grpc-java/issues/2284)
|
||||
return this;
|
||||
|
|
|
@ -31,13 +31,13 @@
|
|||
|
||||
package io.grpc.inprocess;
|
||||
|
||||
import com.google.census.CensusContextFactory;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.instrumentation.stats.StatsContextFactory;
|
||||
|
||||
import io.grpc.ExperimentalApi;
|
||||
import io.grpc.Internal;
|
||||
import io.grpc.internal.AbstractServerImplBuilder;
|
||||
import io.grpc.internal.NoopCensusContextFactory;
|
||||
import io.grpc.internal.NoopStatsContextFactory;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
|
@ -65,9 +65,9 @@ public final class InProcessServerBuilder
|
|||
private InProcessServerBuilder(String name) {
|
||||
this.name = Preconditions.checkNotNull(name, "name");
|
||||
// TODO(zhangkun83): InProcessTransport by-passes framer and deframer, thus message sizses are
|
||||
// not counted. Therefore, we disable Census for now.
|
||||
// not counted. Therefore, we disable stats for now.
|
||||
// (https://github.com/grpc/grpc-java/issues/2284)
|
||||
super.censusContextFactory(NoopCensusContextFactory.INSTANCE);
|
||||
super.statsContextFactory(NoopStatsContextFactory.INSTANCE);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -82,9 +82,9 @@ public final class InProcessServerBuilder
|
|||
|
||||
@Internal
|
||||
@Override
|
||||
public InProcessServerBuilder censusContextFactory(CensusContextFactory censusFactory) {
|
||||
public InProcessServerBuilder statsContextFactory(StatsContextFactory statsFactory) {
|
||||
// TODO(zhangkun83): InProcessTransport by-passes framer and deframer, thus message sizses are
|
||||
// not counted. Census is disabled by using a NOOP Census factory in the constructor, and here
|
||||
// not counted. Stats is disabled by using a NOOP stats factory in the constructor, and here
|
||||
// we prevent the user from overriding it.
|
||||
// (https://github.com/grpc/grpc-java/issues/2284)
|
||||
return this;
|
||||
|
|
|
@ -34,11 +34,11 @@ package io.grpc.internal;
|
|||
import static com.google.common.base.MoreObjects.firstNonNull;
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
||||
import com.google.census.Census;
|
||||
import com.google.census.CensusContextFactory;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.instrumentation.stats.Stats;
|
||||
import com.google.instrumentation.stats.StatsContextFactory;
|
||||
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.ClientInterceptor;
|
||||
|
@ -136,7 +136,7 @@ public abstract class AbstractManagedChannelImplBuilder
|
|||
}
|
||||
|
||||
@Nullable
|
||||
private CensusContextFactory censusFactory;
|
||||
private StatsContextFactory statsFactory;
|
||||
|
||||
protected AbstractManagedChannelImplBuilder(String target) {
|
||||
this.target = Preconditions.checkNotNull(target, "target");
|
||||
|
@ -242,12 +242,12 @@ public abstract class AbstractManagedChannelImplBuilder
|
|||
}
|
||||
|
||||
/**
|
||||
* Override the default Census implementation. This is meant to be used in tests.
|
||||
* Override the default stats implementation. This is meant to be used in tests.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
@Internal
|
||||
public T censusContextFactory(CensusContextFactory censusFactory) {
|
||||
this.censusFactory = censusFactory;
|
||||
public T statsContextFactory(StatsContextFactory statsFactory) {
|
||||
this.statsFactory = statsFactory;
|
||||
return thisT();
|
||||
}
|
||||
|
||||
|
@ -291,8 +291,8 @@ public abstract class AbstractManagedChannelImplBuilder
|
|||
firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
|
||||
GrpcUtil.TIMER_SERVICE, GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
|
||||
executor, userAgent, interceptors,
|
||||
firstNonNull(censusFactory,
|
||||
firstNonNull(Census.getCensusContextFactory(), NoopCensusContextFactory.INSTANCE)));
|
||||
firstNonNull(statsFactory,
|
||||
firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -34,10 +34,10 @@ package io.grpc.internal;
|
|||
import static com.google.common.base.MoreObjects.firstNonNull;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
import com.google.census.Census;
|
||||
import com.google.census.CensusContextFactory;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.instrumentation.stats.Stats;
|
||||
import com.google.instrumentation.stats.StatsContextFactory;
|
||||
|
||||
import io.grpc.BindableService;
|
||||
import io.grpc.CompressorRegistry;
|
||||
|
@ -101,7 +101,7 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
|
|||
private CompressorRegistry compressorRegistry;
|
||||
|
||||
@Nullable
|
||||
private CensusContextFactory censusFactory;
|
||||
private StatsContextFactory statsFactory;
|
||||
|
||||
@Override
|
||||
public final T directExecutor() {
|
||||
|
@ -153,12 +153,12 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
|
|||
}
|
||||
|
||||
/**
|
||||
* Override the default Census implementation. This is meant to be used in tests.
|
||||
* Override the default stats implementation. This is meant to be used in tests.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
@Internal
|
||||
public T censusContextFactory(CensusContextFactory censusFactory) {
|
||||
this.censusFactory = censusFactory;
|
||||
public T statsContextFactory(StatsContextFactory statsFactory) {
|
||||
this.statsFactory = statsFactory;
|
||||
return thisT();
|
||||
}
|
||||
|
||||
|
@ -170,8 +170,8 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
|
|||
Context.ROOT, firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
|
||||
firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
|
||||
transportFilters,
|
||||
firstNonNull(censusFactory,
|
||||
firstNonNull(Census.getCensusContextFactory(), NoopCensusContextFactory.INSTANCE)),
|
||||
firstNonNull(statsFactory,
|
||||
firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)),
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {
|
||||
notifyTarget.notifyOnBuild(server);
|
||||
|
|
|
@ -35,10 +35,10 @@ import static com.google.common.base.Preconditions.checkArgument;
|
|||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
|
||||
import com.google.census.CensusContextFactory;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.instrumentation.stats.StatsContextFactory;
|
||||
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.CallOptions;
|
||||
|
@ -139,7 +139,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
|
|||
private final Supplier<Stopwatch> stopwatchSupplier;
|
||||
/** The timout before entering idle mode, less {@link #IDLE_GRACE_PERIOD_MILLIS}. */
|
||||
private final long idleTimeoutMillis;
|
||||
private final CensusContextFactory censusFactory;
|
||||
private final StatsContextFactory statsFactory;
|
||||
|
||||
/**
|
||||
* Executor that runs deadline timers for requests.
|
||||
|
@ -379,7 +379,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
|
|||
SharedResourceHolder.Resource<ScheduledExecutorService> timerService,
|
||||
Supplier<Stopwatch> stopwatchSupplier, long idleTimeoutMillis,
|
||||
@Nullable Executor executor, @Nullable String userAgent,
|
||||
List<ClientInterceptor> interceptors, CensusContextFactory censusFactory) {
|
||||
List<ClientInterceptor> interceptors, StatsContextFactory statsFactory) {
|
||||
this.target = checkNotNull(target, "target");
|
||||
this.nameResolverFactory = checkNotNull(nameResolverFactory, "nameResolverFactory");
|
||||
this.nameResolverParams = checkNotNull(nameResolverParams, "nameResolverParams");
|
||||
|
@ -411,7 +411,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
|
|||
this.decompressorRegistry = decompressorRegistry;
|
||||
this.compressorRegistry = compressorRegistry;
|
||||
this.userAgent = userAgent;
|
||||
this.censusFactory = checkNotNull(censusFactory, "censusFactory");
|
||||
this.statsFactory = checkNotNull(statsFactory, "statsFactory");
|
||||
|
||||
if (log.isLoggable(Level.INFO)) {
|
||||
log.log(Level.INFO, "[{0}] Created with target {1}", new Object[] {getLogId(), target});
|
||||
|
@ -615,7 +615,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
|
|||
executor = ManagedChannelImpl.this.executor;
|
||||
}
|
||||
StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(
|
||||
method.getFullMethodName(), censusFactory, stopwatchSupplier);
|
||||
method.getFullMethodName(), statsFactory, stopwatchSupplier);
|
||||
return new ClientCallImpl<ReqT, RespT>(
|
||||
method,
|
||||
executor,
|
||||
|
@ -725,7 +725,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
|
|||
@Override
|
||||
public Channel makeChannel(ClientTransport transport) {
|
||||
return new SingleTransportChannel(
|
||||
censusFactory, transport, executor, scheduledExecutor, authority(), stopwatchSupplier);
|
||||
statsFactory, transport, executor, scheduledExecutor, authority(), stopwatchSupplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -31,59 +31,59 @@
|
|||
|
||||
package io.grpc.internal;
|
||||
|
||||
import com.google.census.CensusContext;
|
||||
import com.google.census.CensusContextFactory;
|
||||
import com.google.census.MetricMap;
|
||||
import com.google.census.TagKey;
|
||||
import com.google.census.TagValue;
|
||||
import com.google.instrumentation.stats.MeasurementMap;
|
||||
import com.google.instrumentation.stats.StatsContext;
|
||||
import com.google.instrumentation.stats.StatsContextFactory;
|
||||
import com.google.instrumentation.stats.TagKey;
|
||||
import com.google.instrumentation.stats.TagValue;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
public final class NoopCensusContextFactory extends CensusContextFactory {
|
||||
private static final byte[] SERIALIZED_BYTES = new byte[0];
|
||||
private static final CensusContext DEFAULT_CONTEXT = new NoopCensusContext();
|
||||
private static final CensusContext.Builder BUILDER = new NoopContextBuilder();
|
||||
public final class NoopStatsContextFactory extends StatsContextFactory {
|
||||
private static final StatsContext DEFAULT_CONTEXT = new NoopStatsContext();
|
||||
private static final StatsContext.Builder BUILDER = new NoopContextBuilder();
|
||||
|
||||
public static final CensusContextFactory INSTANCE = new NoopCensusContextFactory();
|
||||
public static final StatsContextFactory INSTANCE = new NoopStatsContextFactory();
|
||||
|
||||
private NoopCensusContextFactory() {
|
||||
private NoopStatsContextFactory() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public CensusContext deserialize(ByteBuffer buffer) {
|
||||
public StatsContext deserialize(InputStream is) {
|
||||
return DEFAULT_CONTEXT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CensusContext getDefault() {
|
||||
public StatsContext getDefault() {
|
||||
return DEFAULT_CONTEXT;
|
||||
}
|
||||
|
||||
private static class NoopCensusContext extends CensusContext {
|
||||
private static class NoopStatsContext extends StatsContext {
|
||||
@Override
|
||||
public Builder builder() {
|
||||
return BUILDER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CensusContext record(MetricMap metrics) {
|
||||
public StatsContext record(MeasurementMap metrics) {
|
||||
return DEFAULT_CONTEXT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer serialize() {
|
||||
return ByteBuffer.wrap(SERIALIZED_BYTES).asReadOnlyBuffer();
|
||||
public void serialize(OutputStream os) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private static class NoopContextBuilder extends CensusContext.Builder {
|
||||
private static class NoopContextBuilder extends StatsContext.Builder {
|
||||
@Override
|
||||
public CensusContext.Builder set(TagKey key, TagValue value) {
|
||||
public StatsContext.Builder set(TagKey key, TagValue value) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CensusContext build() {
|
||||
public StatsContext build() {
|
||||
return DEFAULT_CONTEXT;
|
||||
}
|
||||
}
|
|
@ -39,10 +39,10 @@ import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
|
|||
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
|
||||
import static java.util.concurrent.TimeUnit.NANOSECONDS;
|
||||
|
||||
import com.google.census.CensusContextFactory;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.instrumentation.stats.StatsContextFactory;
|
||||
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.CompressorRegistry;
|
||||
|
@ -95,7 +95,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
|
|||
private final InternalHandlerRegistry registry;
|
||||
private final HandlerRegistry fallbackRegistry;
|
||||
private final List<ServerTransportFilter> transportFilters;
|
||||
private final CensusContextFactory censusFactory;
|
||||
private final StatsContextFactory statsFactory;
|
||||
@GuardedBy("lock") private boolean started;
|
||||
@GuardedBy("lock") private boolean shutdown;
|
||||
/** non-{@code null} if immediate shutdown has been requested. */
|
||||
|
@ -129,7 +129,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
|
|||
ServerImpl(Executor executor, InternalHandlerRegistry registry, HandlerRegistry fallbackRegistry,
|
||||
InternalServer transportServer, Context rootContext,
|
||||
DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
|
||||
List<ServerTransportFilter> transportFilters, CensusContextFactory censusFactory,
|
||||
List<ServerTransportFilter> transportFilters, StatsContextFactory statsFactory,
|
||||
Supplier<Stopwatch> stopwatchSupplier) {
|
||||
this.executor = executor;
|
||||
this.registry = Preconditions.checkNotNull(registry, "registry");
|
||||
|
@ -142,7 +142,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
|
|||
this.compressorRegistry = compressorRegistry;
|
||||
this.transportFilters = Collections.unmodifiableList(
|
||||
new ArrayList<ServerTransportFilter>(transportFilters));
|
||||
this.censusFactory = Preconditions.checkNotNull(censusFactory, "censusFactory");
|
||||
this.statsFactory = Preconditions.checkNotNull(statsFactory, "statsFactory");
|
||||
this.stopwatchSupplier = Preconditions.checkNotNull(stopwatchSupplier, "stopwatchSupplier");
|
||||
}
|
||||
|
||||
|
@ -376,7 +376,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
|
|||
@Override
|
||||
public StatsTraceContext methodDetermined(String methodName, Metadata headers) {
|
||||
return StatsTraceContext.newServerContext(
|
||||
methodName, censusFactory, headers, stopwatchSupplier);
|
||||
methodName, statsFactory, headers, stopwatchSupplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -442,7 +442,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
|
|||
final ServerStream stream, Metadata headers) {
|
||||
Long timeoutNanos = headers.get(TIMEOUT_KEY);
|
||||
|
||||
// TODO(zhangkun83): attach the CensusContext from StatsTraceContext to baseContext
|
||||
// TODO(zhangkun83): attach the StatsContext from StatsTraceContext to baseContext
|
||||
Context baseContext = rootContext;
|
||||
|
||||
if (timeoutNanos == null) {
|
||||
|
|
|
@ -31,10 +31,10 @@
|
|||
|
||||
package io.grpc.internal;
|
||||
|
||||
import com.google.census.CensusContextFactory;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.instrumentation.stats.StatsContextFactory;
|
||||
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
|
@ -50,7 +50,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
*/
|
||||
final class SingleTransportChannel extends Channel {
|
||||
|
||||
private final CensusContextFactory censusFactory;
|
||||
private final StatsContextFactory statsFactory;
|
||||
private final ClientTransport transport;
|
||||
private final Executor executor;
|
||||
private final String authority;
|
||||
|
@ -67,10 +67,10 @@ final class SingleTransportChannel extends Channel {
|
|||
/**
|
||||
* Creates a new channel with a connected transport.
|
||||
*/
|
||||
public SingleTransportChannel(CensusContextFactory censusFactory, ClientTransport transport,
|
||||
public SingleTransportChannel(StatsContextFactory statsFactory, ClientTransport transport,
|
||||
Executor executor, ScheduledExecutorService deadlineCancellationExecutor, String authority,
|
||||
Supplier<Stopwatch> stopwatchSupplier) {
|
||||
this.censusFactory = Preconditions.checkNotNull(censusFactory, "censusFactory");
|
||||
this.statsFactory = Preconditions.checkNotNull(statsFactory, "statsFactory");
|
||||
this.transport = Preconditions.checkNotNull(transport, "transport");
|
||||
this.executor = Preconditions.checkNotNull(executor, "executor");
|
||||
this.deadlineCancellationExecutor = Preconditions.checkNotNull(
|
||||
|
@ -83,7 +83,7 @@ final class SingleTransportChannel extends Channel {
|
|||
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
|
||||
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
|
||||
StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(
|
||||
methodDescriptor.getFullMethodName(), censusFactory, stopwatchSupplier);
|
||||
methodDescriptor.getFullMethodName(), statsFactory, stopwatchSupplier);
|
||||
return new ClientCallImpl<RequestT, ResponseT>(methodDescriptor,
|
||||
new SerializingExecutor(executor), callOptions, statsTraceCtx, transportProvider,
|
||||
deadlineCancellationExecutor);
|
||||
|
|
|
@ -31,21 +31,23 @@
|
|||
|
||||
package io.grpc.internal;
|
||||
|
||||
import com.google.census.CensusContext;
|
||||
import com.google.census.CensusContextFactory;
|
||||
import com.google.census.MetricMap;
|
||||
import com.google.census.MetricName;
|
||||
import com.google.census.RpcConstants;
|
||||
import com.google.census.TagKey;
|
||||
import com.google.census.TagValue;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.instrumentation.stats.MeasurementDescriptor;
|
||||
import com.google.instrumentation.stats.MeasurementMap;
|
||||
import com.google.instrumentation.stats.RpcConstants;
|
||||
import com.google.instrumentation.stats.StatsContext;
|
||||
import com.google.instrumentation.stats.StatsContextFactory;
|
||||
import com.google.instrumentation.stats.TagKey;
|
||||
import com.google.instrumentation.stats.TagValue;
|
||||
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
@ -61,107 +63,113 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
@SuppressWarnings("NonAtomicVolatileUpdate")
|
||||
public final class StatsTraceContext {
|
||||
public static final StatsTraceContext NOOP = StatsTraceContext.newClientContext(
|
||||
"noopservice/noopmethod", NoopCensusContextFactory.INSTANCE,
|
||||
"noopservice/noopmethod", NoopStatsContextFactory.INSTANCE,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
|
||||
private enum Side {
|
||||
CLIENT, SERVER
|
||||
}
|
||||
|
||||
private final CensusContext censusCtx;
|
||||
private final StatsContext statsCtx;
|
||||
private final Stopwatch stopwatch;
|
||||
private final Side side;
|
||||
private final Metadata.Key<CensusContext> censusHeader;
|
||||
private final Metadata.Key<StatsContext> statsHeader;
|
||||
private volatile long wireBytesSent;
|
||||
private volatile long wireBytesReceived;
|
||||
private volatile long uncompressedBytesSent;
|
||||
private volatile long uncompressedBytesReceived;
|
||||
private final AtomicBoolean callEnded = new AtomicBoolean(false);
|
||||
|
||||
private StatsTraceContext(Side side, String fullMethodName, CensusContext parentCtx,
|
||||
Supplier<Stopwatch> stopwatchSupplier, Metadata.Key<CensusContext> censusHeader) {
|
||||
private StatsTraceContext(Side side, String fullMethodName, StatsContext parentCtx,
|
||||
Supplier<Stopwatch> stopwatchSupplier, Metadata.Key<StatsContext> statsHeader) {
|
||||
this.side = side;
|
||||
TagKey methodTagKey =
|
||||
side == Side.CLIENT ? RpcConstants.RPC_CLIENT_METHOD : RpcConstants.RPC_SERVER_METHOD;
|
||||
// TODO(carl-mastrangelo): maybe cache TagValue in MethodDescriptor
|
||||
this.censusCtx = parentCtx.with(methodTagKey, new TagValue(fullMethodName));
|
||||
this.statsCtx = parentCtx.with(methodTagKey, TagValue.create(fullMethodName));
|
||||
this.stopwatch = stopwatchSupplier.get().start();
|
||||
this.censusHeader = censusHeader;
|
||||
this.statsHeader = statsHeader;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a {@code StatsTraceContext} for an outgoing RPC, using the current CensusContext.
|
||||
* Creates a {@code StatsTraceContext} for an outgoing RPC, using the current StatsContext.
|
||||
*
|
||||
* <p>The current time is used as the start time of the RPC.
|
||||
*/
|
||||
public static StatsTraceContext newClientContext(String methodName,
|
||||
CensusContextFactory censusFactory, Supplier<Stopwatch> stopwatchSupplier) {
|
||||
StatsContextFactory statsFactory, Supplier<Stopwatch> stopwatchSupplier) {
|
||||
return new StatsTraceContext(Side.CLIENT, methodName,
|
||||
// TODO(zhangkun83): use the CensusContext out of the current Context
|
||||
censusFactory.getDefault(),
|
||||
stopwatchSupplier, createCensusHeader(censusFactory));
|
||||
// TODO(zhangkun83): use the StatsContext out of the current Context
|
||||
statsFactory.getDefault(),
|
||||
stopwatchSupplier, createStatsHeader(statsFactory));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static StatsTraceContext newClientContextForTesting(String methodName,
|
||||
CensusContextFactory censusFactory, CensusContext parent,
|
||||
StatsContextFactory statsFactory, StatsContext parent,
|
||||
Supplier<Stopwatch> stopwatchSupplier) {
|
||||
return new StatsTraceContext(Side.CLIENT, methodName, parent, stopwatchSupplier,
|
||||
createCensusHeader(censusFactory));
|
||||
createStatsHeader(statsFactory));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a {@code StatsTraceContext} for an incoming RPC, using the CensusContext deserialized
|
||||
* Creates a {@code StatsTraceContext} for an incoming RPC, using the StatsContext deserialized
|
||||
* from the headers.
|
||||
*
|
||||
* <p>The current time is used as the start time of the RPC.
|
||||
*/
|
||||
public static StatsTraceContext newServerContext(String methodName,
|
||||
CensusContextFactory censusFactory, Metadata headers,
|
||||
StatsContextFactory statsFactory, Metadata headers,
|
||||
Supplier<Stopwatch> stopwatchSupplier) {
|
||||
Metadata.Key<CensusContext> censusHeader = createCensusHeader(censusFactory);
|
||||
CensusContext parentCtx = headers.get(censusHeader);
|
||||
Metadata.Key<StatsContext> statsHeader = createStatsHeader(statsFactory);
|
||||
StatsContext parentCtx = headers.get(statsHeader);
|
||||
if (parentCtx == null) {
|
||||
parentCtx = censusFactory.getDefault();
|
||||
parentCtx = statsFactory.getDefault();
|
||||
}
|
||||
return new StatsTraceContext(Side.SERVER, methodName, parentCtx, stopwatchSupplier,
|
||||
censusHeader);
|
||||
statsHeader);
|
||||
}
|
||||
|
||||
/**
|
||||
* Propagate the context to the outgoing headers.
|
||||
*/
|
||||
void propagateToHeaders(Metadata headers) {
|
||||
headers.discardAll(censusHeader);
|
||||
headers.put(censusHeader, censusCtx);
|
||||
headers.discardAll(statsHeader);
|
||||
headers.put(statsHeader, statsCtx);
|
||||
}
|
||||
|
||||
Metadata.Key<CensusContext> getCensusHeader() {
|
||||
return censusHeader;
|
||||
Metadata.Key<StatsContext> getStatsHeader() {
|
||||
return statsHeader;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
CensusContext getCensusContext() {
|
||||
return censusCtx;
|
||||
StatsContext getStatsContext() {
|
||||
return statsCtx;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static Metadata.Key<CensusContext> createCensusHeader(
|
||||
final CensusContextFactory censusCtxFactory) {
|
||||
return Metadata.Key.of("grpc-census-bin", new Metadata.BinaryMarshaller<CensusContext>() {
|
||||
static Metadata.Key<StatsContext> createStatsHeader(final StatsContextFactory statsCtxFactory) {
|
||||
return Metadata.Key.of("grpc-census-bin", new Metadata.BinaryMarshaller<StatsContext>() {
|
||||
@Override
|
||||
public byte[] toBytes(CensusContext context) {
|
||||
ByteBuffer buffer = context.serialize();
|
||||
public byte[] toBytes(StatsContext context) {
|
||||
// TODO(carl-mastrangelo): currently we only make sure the correctness. We may need to
|
||||
// optimize out the allocation and copy in the future.
|
||||
byte[] bytes = new byte[buffer.remaining()];
|
||||
buffer.get(bytes);
|
||||
return bytes;
|
||||
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
|
||||
try {
|
||||
context.serialize(buffer);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return buffer.toByteArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CensusContext parseBytes(byte[] serialized) {
|
||||
return censusCtxFactory.deserialize(ByteBuffer.wrap(serialized));
|
||||
public StatsContext parseBytes(byte[] serialized) {
|
||||
try {
|
||||
return statsCtxFactory.deserialize(new ByteArrayInputStream(serialized));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -209,11 +217,11 @@ public final class StatsTraceContext {
|
|||
return;
|
||||
}
|
||||
stopwatch.stop();
|
||||
MetricName latencyMetric;
|
||||
MetricName wireBytesSentMetric;
|
||||
MetricName wireBytesReceivedMetric;
|
||||
MetricName uncompressedBytesSentMetric;
|
||||
MetricName uncompressedBytesReceivedMetric;
|
||||
MeasurementDescriptor latencyMetric;
|
||||
MeasurementDescriptor wireBytesSentMetric;
|
||||
MeasurementDescriptor wireBytesReceivedMetric;
|
||||
MeasurementDescriptor uncompressedBytesSentMetric;
|
||||
MeasurementDescriptor uncompressedBytesReceivedMetric;
|
||||
if (side == Side.CLIENT) {
|
||||
latencyMetric = RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY;
|
||||
wireBytesSentMetric = RpcConstants.RPC_CLIENT_REQUEST_BYTES;
|
||||
|
@ -227,9 +235,9 @@ public final class StatsTraceContext {
|
|||
uncompressedBytesSentMetric = RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES;
|
||||
uncompressedBytesReceivedMetric = RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES;
|
||||
}
|
||||
censusCtx
|
||||
.with(RpcConstants.RPC_STATUS, new TagValue(status.getCode().toString()))
|
||||
.record(MetricMap.builder()
|
||||
statsCtx
|
||||
.with(RpcConstants.RPC_STATUS, TagValue.create(status.getCode().toString()))
|
||||
.record(MeasurementMap.builder()
|
||||
.put(latencyMetric, stopwatch.elapsed(TimeUnit.MILLISECONDS))
|
||||
.put(wireBytesSentMetric, wireBytesSent)
|
||||
.put(wireBytesReceivedMetric, wireBytesReceived)
|
||||
|
|
|
@ -106,7 +106,7 @@ public class CallCredentialsApplyingTest {
|
|||
|
||||
private final Metadata origHeaders = new Metadata();
|
||||
private final StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(
|
||||
method.getFullMethodName(), NoopCensusContextFactory.INSTANCE, GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
method.getFullMethodName(), NoopStatsContextFactory.INSTANCE, GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
private ForwardingConnectionClientTransport transport;
|
||||
private CallOptions callOptions;
|
||||
|
||||
|
|
|
@ -51,12 +51,12 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.census.CensusContext;
|
||||
import com.google.census.RpcConstants;
|
||||
import com.google.census.TagValue;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import com.google.instrumentation.stats.RpcConstants;
|
||||
import com.google.instrumentation.stats.StatsContext;
|
||||
import com.google.instrumentation.stats.TagValue;
|
||||
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.ClientCall;
|
||||
|
@ -71,8 +71,8 @@ import io.grpc.MethodDescriptor.Marshaller;
|
|||
import io.grpc.MethodDescriptor.MethodType;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
|
||||
import io.grpc.internal.testing.CensusTestUtils.FakeCensusContextFactory;
|
||||
import io.grpc.internal.testing.CensusTestUtils;
|
||||
import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory;
|
||||
import io.grpc.internal.testing.StatsTestUtils;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -121,13 +121,13 @@ public class ClientCallImplTest {
|
|||
new TestMarshaller<Void>(),
|
||||
new TestMarshaller<Void>());
|
||||
|
||||
private final FakeCensusContextFactory censusCtxFactory = new FakeCensusContextFactory();
|
||||
private final CensusContext parentCensusContext = censusCtxFactory.getDefault().with(
|
||||
CensusTestUtils.EXTRA_TAG, new TagValue("extra-tag-value"));
|
||||
private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory();
|
||||
private final StatsContext parentStatsContext = statsCtxFactory.getDefault().with(
|
||||
StatsTestUtils.EXTRA_TAG, TagValue.create("extra-tag-value"));
|
||||
private final StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContextForTesting(
|
||||
method.getFullMethodName(), censusCtxFactory, parentCensusContext,
|
||||
method.getFullMethodName(), statsCtxFactory, parentStatsContext,
|
||||
fakeClock.getStopwatchSupplier());
|
||||
private final CensusContext censusCtx = censusCtxFactory.contexts.poll();
|
||||
private final StatsContext statsCtx = statsCtxFactory.contexts.poll();
|
||||
|
||||
@Mock private ClientStreamListener streamListener;
|
||||
@Mock private ClientTransport clientTransport;
|
||||
|
@ -154,7 +154,7 @@ public class ClientCallImplTest {
|
|||
@Before
|
||||
public void setUp() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
assertNotNull(censusCtx);
|
||||
assertNotNull(statsCtx);
|
||||
when(provider.get(any(CallOptions.class))).thenReturn(transport);
|
||||
when(transport.newStream(any(MethodDescriptor.class), any(Metadata.class),
|
||||
any(CallOptions.class), any(StatsTraceContext.class))).thenReturn(stream);
|
||||
|
@ -443,10 +443,10 @@ public class ClientCallImplTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void prepareHeaders_censusCtxAdded() {
|
||||
public void prepareHeaders_statsCtxAdded() {
|
||||
Metadata m = new Metadata();
|
||||
ClientCallImpl.prepareHeaders(m, decompressorRegistry, Codec.Identity.NONE, statsTraceCtx);
|
||||
assertEquals(parentCensusContext, m.get(statsTraceCtx.getCensusHeader()));
|
||||
assertEquals(parentStatsContext, m.get(statsTraceCtx.getStatsHeader()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -825,7 +825,7 @@ public class ClientCallImplTest {
|
|||
}
|
||||
|
||||
private void assertStatusInStats(Status.Code statusCode) {
|
||||
CensusTestUtils.MetricsRecord record = censusCtxFactory.pollRecord();
|
||||
StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
|
||||
assertNotNull(record);
|
||||
TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
|
||||
assertNotNull(statusTag);
|
||||
|
|
|
@ -110,10 +110,10 @@ public class DelayedClientTransport2Test {
|
|||
private final CallOptions callOptions = CallOptions.DEFAULT.withAuthority("dummy_value");
|
||||
private final CallOptions callOptions2 = CallOptions.DEFAULT.withAuthority("dummy_value2");
|
||||
private final StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(
|
||||
method.getFullMethodName(), NoopCensusContextFactory.INSTANCE,
|
||||
method.getFullMethodName(), NoopStatsContextFactory.INSTANCE,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
private final StatsTraceContext statsTraceCtx2 = StatsTraceContext.newClientContext(
|
||||
method2.getFullMethodName(), NoopCensusContextFactory.INSTANCE,
|
||||
method2.getFullMethodName(), NoopStatsContextFactory.INSTANCE,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
|
||||
private final FakeClock fakeExecutor = new FakeClock();
|
||||
|
|
|
@ -105,10 +105,10 @@ public class DelayedClientTransportTest {
|
|||
private final CallOptions callOptions = CallOptions.DEFAULT.withAuthority("dummy_value");
|
||||
private final CallOptions callOptions2 = CallOptions.DEFAULT.withAuthority("dummy_value2");
|
||||
private final StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(
|
||||
method.getFullMethodName(), NoopCensusContextFactory.INSTANCE,
|
||||
method.getFullMethodName(), NoopStatsContextFactory.INSTANCE,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
private final StatsTraceContext statsTraceCtx2 = StatsTraceContext.newClientContext(
|
||||
method2.getFullMethodName(), NoopCensusContextFactory.INSTANCE,
|
||||
method2.getFullMethodName(), NoopStatsContextFactory.INSTANCE,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
|
||||
private final FakeClock fakeExecutor = new FakeClock();
|
||||
|
|
|
@ -140,7 +140,7 @@ public class ManagedChannelImplIdlenessTest {
|
|||
TimeUnit.SECONDS.toMillis(IDLE_TIMEOUT_SECONDS),
|
||||
executor.getScheduledExecutorService(), USER_AGENT,
|
||||
Collections.<ClientInterceptor>emptyList(),
|
||||
NoopCensusContextFactory.INSTANCE);
|
||||
NoopStatsContextFactory.INSTANCE);
|
||||
newTransports = TestUtils.captureTransports(mockTransportFactory);
|
||||
|
||||
for (int i = 0; i < 2; i++) {
|
||||
|
|
|
@ -78,7 +78,7 @@ import io.grpc.SecurityLevel;
|
|||
import io.grpc.Status;
|
||||
import io.grpc.StringMarshaller;
|
||||
import io.grpc.TransportManager;
|
||||
import io.grpc.internal.testing.CensusTestUtils.FakeCensusContextFactory;
|
||||
import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -127,7 +127,7 @@ public class ManagedChannelImplTest {
|
|||
private final ResolvedServerInfo server = new ResolvedServerInfo(socketAddress, Attributes.EMPTY);
|
||||
private final FakeClock timer = new FakeClock();
|
||||
private final FakeClock executor = new FakeClock();
|
||||
private final FakeCensusContextFactory censusCtxFactory = new FakeCensusContextFactory();
|
||||
private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory();
|
||||
private SpyingLoadBalancerFactory loadBalancerFactory =
|
||||
new SpyingLoadBalancerFactory(PickFirstBalancerFactory.getInstance());
|
||||
|
||||
|
@ -165,7 +165,7 @@ public class ManagedChannelImplTest {
|
|||
mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
|
||||
CompressorRegistry.getDefaultInstance(), timerService, timer.getStopwatchSupplier(),
|
||||
ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE,
|
||||
executor.getScheduledExecutorService(), userAgent, interceptors, censusCtxFactory);
|
||||
executor.getScheduledExecutorService(), userAgent, interceptors, statsCtxFactory);
|
||||
// Force-exit the initial idle-mode
|
||||
channel.exitIdleMode();
|
||||
// Will start NameResolver in the scheduled executor
|
||||
|
@ -257,8 +257,8 @@ public class ManagedChannelImplTest {
|
|||
|
||||
verify(mockTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT),
|
||||
statsTraceCtxCaptor.capture());
|
||||
assertEquals(censusCtxFactory.pollContextOrFail(),
|
||||
statsTraceCtxCaptor.getValue().getCensusContext());
|
||||
assertEquals(statsCtxFactory.pollContextOrFail(),
|
||||
statsTraceCtxCaptor.getValue().getStatsContext());
|
||||
verify(mockStream).start(streamListenerCaptor.capture());
|
||||
verify(mockStream).setCompressor(isA(Compressor.class));
|
||||
ClientStreamListener streamListener = streamListenerCaptor.getValue();
|
||||
|
@ -273,8 +273,8 @@ public class ManagedChannelImplTest {
|
|||
call2.start(mockCallListener2, headers2);
|
||||
verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT),
|
||||
statsTraceCtxCaptor.capture());
|
||||
assertEquals(censusCtxFactory.pollContextOrFail(),
|
||||
statsTraceCtxCaptor.getValue().getCensusContext());
|
||||
assertEquals(statsCtxFactory.pollContextOrFail(),
|
||||
statsTraceCtxCaptor.getValue().getStatsContext());
|
||||
|
||||
verify(mockStream2).start(streamListenerCaptor.capture());
|
||||
ClientStreamListener streamListener2 = streamListenerCaptor.getValue();
|
||||
|
|
|
@ -102,10 +102,10 @@ public class ManagedChannelImplTransportManagerTest {
|
|||
private final CallOptions callOptions = CallOptions.DEFAULT.withAuthority("dummy_value");
|
||||
private final CallOptions callOptions2 = CallOptions.DEFAULT.withAuthority("dummy_value2");
|
||||
private final StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(
|
||||
method.getFullMethodName(), NoopCensusContextFactory.INSTANCE,
|
||||
method.getFullMethodName(), NoopStatsContextFactory.INSTANCE,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
private final StatsTraceContext statsTraceCtx2 = StatsTraceContext.newClientContext(
|
||||
method2.getFullMethodName(), NoopCensusContextFactory.INSTANCE,
|
||||
method2.getFullMethodName(), NoopStatsContextFactory.INSTANCE,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
|
||||
private ManagedChannelImpl channel;
|
||||
|
@ -142,7 +142,7 @@ public class ManagedChannelImplTransportManagerTest {
|
|||
CompressorRegistry.getDefaultInstance(), GrpcUtil.TIMER_SERVICE,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER, ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE,
|
||||
executor, USER_AGENT, Collections.<ClientInterceptor>emptyList(),
|
||||
NoopCensusContextFactory.INSTANCE);
|
||||
NoopStatsContextFactory.INSTANCE);
|
||||
|
||||
ArgumentCaptor<TransportManager<ClientTransport>> tmCaptor
|
||||
= ArgumentCaptor.forClass(null);
|
||||
|
|
|
@ -43,18 +43,18 @@ import static org.mockito.Mockito.times;
|
|||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
|
||||
import com.google.census.RpcConstants;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.io.ByteStreams;
|
||||
import com.google.common.primitives.Bytes;
|
||||
import com.google.instrumentation.stats.RpcConstants;
|
||||
|
||||
import io.grpc.Codec;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.grpc.internal.MessageDeframer.Listener;
|
||||
import io.grpc.internal.MessageDeframer.SizeEnforcingInputStream;
|
||||
import io.grpc.internal.testing.CensusTestUtils.FakeCensusContextFactory;
|
||||
import io.grpc.internal.testing.CensusTestUtils.MetricsRecord;
|
||||
import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory;
|
||||
import io.grpc.internal.testing.StatsTestUtils.MetricsRecord;
|
||||
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
@ -81,11 +81,11 @@ public class MessageDeframerTest {
|
|||
@Rule public final ExpectedException thrown = ExpectedException.none();
|
||||
|
||||
private Listener listener = mock(Listener.class);
|
||||
private final FakeCensusContextFactory censusCtxFactory = new FakeCensusContextFactory();
|
||||
private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory();
|
||||
// MessageFramerTest tests with a server-side StatsTraceContext, so here we test with a
|
||||
// client-side StatsTraceContext.
|
||||
private StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(
|
||||
"service/method", censusCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
"service/method", statsCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
|
||||
private MessageDeframer deframer = new MessageDeframer(listener, Codec.Identity.NONE,
|
||||
DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx);
|
||||
|
@ -393,7 +393,7 @@ public class MessageDeframerTest {
|
|||
|
||||
private void checkStats(long wireBytesReceived, long uncompressedBytesReceived) {
|
||||
statsTraceCtx.callEnded(Status.OK);
|
||||
MetricsRecord record = censusCtxFactory.pollRecord();
|
||||
MetricsRecord record = statsCtxFactory.pollRecord();
|
||||
assertEquals(0, record.getMetricAsLongOrFail(
|
||||
RpcConstants.RPC_CLIENT_REQUEST_BYTES));
|
||||
assertEquals(0, record.getMetricAsLongOrFail(
|
||||
|
|
|
@ -41,14 +41,14 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
|
||||
import com.google.census.RpcConstants;
|
||||
import com.google.instrumentation.stats.RpcConstants;
|
||||
|
||||
import io.grpc.Codec;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.StatsTraceContext;
|
||||
import io.grpc.internal.testing.CensusTestUtils.FakeCensusContextFactory;
|
||||
import io.grpc.internal.testing.CensusTestUtils.MetricsRecord;
|
||||
import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory;
|
||||
import io.grpc.internal.testing.StatsTestUtils.MetricsRecord;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -78,18 +78,18 @@ public class MessageFramerTest {
|
|||
private ArgumentCaptor<ByteWritableBuffer> frameCaptor;
|
||||
private BytesWritableBufferAllocator allocator =
|
||||
new BytesWritableBufferAllocator(1000, 1000);
|
||||
private FakeCensusContextFactory censusCtxFactory;
|
||||
private FakeStatsContextFactory statsCtxFactory;
|
||||
private StatsTraceContext statsTraceCtx;
|
||||
|
||||
/** Set up for test. */
|
||||
@Before
|
||||
public void setUp() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
censusCtxFactory = new FakeCensusContextFactory();
|
||||
statsCtxFactory = new FakeStatsContextFactory();
|
||||
// MessageDeframerTest tests with a client-side StatsTraceContext, so here we test with a
|
||||
// server-side StatsTraceContext.
|
||||
statsTraceCtx = StatsTraceContext.newServerContext(
|
||||
"service/method", censusCtxFactory, new Metadata(), GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
"service/method", statsCtxFactory, new Metadata(), GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
framer = new MessageFramer(sink, allocator, statsTraceCtx);
|
||||
}
|
||||
|
||||
|
@ -389,7 +389,7 @@ public class MessageFramerTest {
|
|||
|
||||
private void checkStats(long wireBytesSent, long uncompressedBytesSent) {
|
||||
statsTraceCtx.callEnded(Status.OK);
|
||||
MetricsRecord record = censusCtxFactory.pollRecord();
|
||||
MetricsRecord record = statsCtxFactory.pollRecord();
|
||||
assertEquals(0, record.getMetricAsLongOrFail(
|
||||
RpcConstants.RPC_SERVER_REQUEST_BYTES));
|
||||
assertEquals(0, record.getMetricAsLongOrFail(
|
||||
|
|
|
@ -43,9 +43,9 @@ import static org.mockito.Mockito.doThrow;
|
|||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.census.RpcConstants;
|
||||
import com.google.census.TagValue;
|
||||
import com.google.common.io.CharStreams;
|
||||
import com.google.instrumentation.stats.RpcConstants;
|
||||
import com.google.instrumentation.stats.TagValue;
|
||||
|
||||
import io.grpc.CompressorRegistry;
|
||||
import io.grpc.Context;
|
||||
|
@ -57,8 +57,8 @@ import io.grpc.MethodDescriptor.MethodType;
|
|||
import io.grpc.ServerCall;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl;
|
||||
import io.grpc.internal.testing.CensusTestUtils.FakeCensusContextFactory;
|
||||
import io.grpc.internal.testing.CensusTestUtils;
|
||||
import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory;
|
||||
import io.grpc.internal.testing.StatsTestUtils;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
|
@ -90,9 +90,9 @@ public class ServerCallImplTest {
|
|||
MethodType.UNARY, "/service/method", new LongMarshaller(), new LongMarshaller());
|
||||
|
||||
private final Metadata requestHeaders = new Metadata();
|
||||
private final FakeCensusContextFactory censusCtxFactory = new FakeCensusContextFactory();
|
||||
private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory();
|
||||
private final StatsTraceContext statsTraceCtx = StatsTraceContext.newServerContext(
|
||||
method.getFullMethodName(), censusCtxFactory, requestHeaders, GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
method.getFullMethodName(), statsCtxFactory, requestHeaders, GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
|
@ -327,7 +327,7 @@ public class ServerCallImplTest {
|
|||
}
|
||||
|
||||
private void checkStats(Status.Code statusCode) {
|
||||
CensusTestUtils.MetricsRecord record = censusCtxFactory.pollRecord();
|
||||
StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
|
||||
assertNotNull(record);
|
||||
TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
|
||||
assertNotNull(statusTag);
|
||||
|
|
|
@ -52,12 +52,12 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.census.CensusContext;
|
||||
import com.google.census.RpcConstants;
|
||||
import com.google.census.TagValue;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.truth.Truth;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.instrumentation.stats.RpcConstants;
|
||||
import com.google.instrumentation.stats.StatsContext;
|
||||
import com.google.instrumentation.stats.TagValue;
|
||||
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.Compressor;
|
||||
|
@ -77,8 +77,8 @@ import io.grpc.ServerTransportFilter;
|
|||
import io.grpc.ServiceDescriptor;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StringMarshaller;
|
||||
import io.grpc.internal.testing.CensusTestUtils;
|
||||
import io.grpc.internal.testing.CensusTestUtils.FakeCensusContextFactory;
|
||||
import io.grpc.internal.testing.StatsTestUtils;
|
||||
import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory;
|
||||
import io.grpc.util.MutableHandlerRegistry;
|
||||
|
||||
import org.junit.After;
|
||||
|
@ -119,7 +119,7 @@ public class ServerImplTest {
|
|||
Context.ROOT.withValue(SERVER_ONLY, "yes").withCancellation();
|
||||
private static final ImmutableList<ServerTransportFilter> NO_FILTERS = ImmutableList.of();
|
||||
|
||||
private final FakeCensusContextFactory censusCtxFactory = new FakeCensusContextFactory();
|
||||
private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory();
|
||||
private final CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
|
||||
private final DecompressorRegistry decompressorRegistry =
|
||||
DecompressorRegistry.getDefaultInstance();
|
||||
|
@ -138,7 +138,7 @@ public class ServerImplTest {
|
|||
private MutableHandlerRegistry fallbackRegistry = new MutableHandlerRegistry();
|
||||
private SimpleServer transportServer = new SimpleServer();
|
||||
private ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, censusCtxFactory,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
|
||||
@Captor
|
||||
|
@ -174,7 +174,7 @@ public class ServerImplTest {
|
|||
public void shutdown() {}
|
||||
};
|
||||
ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, censusCtxFactory,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
server.start();
|
||||
server.shutdown();
|
||||
|
@ -193,7 +193,7 @@ public class ServerImplTest {
|
|||
}
|
||||
};
|
||||
ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, censusCtxFactory,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
server.shutdown();
|
||||
assertTrue(server.isShutdown());
|
||||
|
@ -203,7 +203,7 @@ public class ServerImplTest {
|
|||
@Test
|
||||
public void startStopImmediateWithChildTransport() throws IOException {
|
||||
ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, censusCtxFactory,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
server.start();
|
||||
class DelayedShutdownServerTransport extends SimpleServerTransport {
|
||||
|
@ -228,7 +228,7 @@ public class ServerImplTest {
|
|||
@Test
|
||||
public void startShutdownNowImmediateWithChildTransport() throws IOException {
|
||||
ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, censusCtxFactory,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
server.start();
|
||||
class DelayedShutdownServerTransport extends SimpleServerTransport {
|
||||
|
@ -256,7 +256,7 @@ public class ServerImplTest {
|
|||
@Test
|
||||
public void shutdownNowAfterShutdown() throws IOException {
|
||||
ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, censusCtxFactory,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
server.start();
|
||||
class DelayedShutdownServerTransport extends SimpleServerTransport {
|
||||
|
@ -291,7 +291,7 @@ public class ServerImplTest {
|
|||
}
|
||||
};
|
||||
ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, censusCtxFactory,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
server.start();
|
||||
class DelayedShutdownServerTransport extends SimpleServerTransport {
|
||||
|
@ -329,7 +329,7 @@ public class ServerImplTest {
|
|||
|
||||
ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry,
|
||||
new FailingStartupServer(), SERVER_CONTEXT, decompressorRegistry, compressorRegistry,
|
||||
NO_FILTERS, censusCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
NO_FILTERS, statsCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
try {
|
||||
server.start();
|
||||
fail("expected exception");
|
||||
|
@ -356,7 +356,7 @@ public class ServerImplTest {
|
|||
assertEquals(Status.Code.UNIMPLEMENTED, status.getCode());
|
||||
assertEquals("Method not found: Waiter/nonexist", status.getDescription());
|
||||
|
||||
CensusTestUtils.MetricsRecord record = censusCtxFactory.pollRecord();
|
||||
StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
|
||||
assertNotNull(record);
|
||||
TagValue methodTag = record.tags.get(RpcConstants.RPC_SERVER_METHOD);
|
||||
assertNotNull(methodTag);
|
||||
|
@ -370,8 +370,8 @@ public class ServerImplTest {
|
|||
public void basicExchangeSuccessful() throws Exception {
|
||||
final Metadata.Key<String> metadataKey
|
||||
= Metadata.Key.of("inception", Metadata.ASCII_STRING_MARSHALLER);
|
||||
final Metadata.Key<CensusContext> censusHeaderKey
|
||||
= StatsTraceContext.createCensusHeader(censusCtxFactory);
|
||||
final Metadata.Key<StatsContext> statsHeaderKey
|
||||
= StatsTraceContext.createStatsHeader(statsCtxFactory);
|
||||
final AtomicReference<ServerCall<String, Integer>> callReference
|
||||
= new AtomicReference<ServerCall<String, Integer>>();
|
||||
MethodDescriptor<String, Integer> method = MethodDescriptor.create(
|
||||
|
@ -398,9 +398,9 @@ public class ServerImplTest {
|
|||
|
||||
Metadata requestHeaders = new Metadata();
|
||||
requestHeaders.put(metadataKey, "value");
|
||||
CensusContext censusContextOnClient = censusCtxFactory.getDefault().with(
|
||||
CensusTestUtils.EXTRA_TAG, new TagValue("extraTagValue"));
|
||||
requestHeaders.put(censusHeaderKey, censusContextOnClient);
|
||||
StatsContext statsContextOnClient = statsCtxFactory.getDefault().with(
|
||||
StatsTestUtils.EXTRA_TAG, TagValue.create("extraTagValue"));
|
||||
requestHeaders.put(statsHeaderKey, statsContextOnClient);
|
||||
StatsTraceContext statsTraceCtx =
|
||||
transportListener.methodDetermined("Waiter/serve", requestHeaders);
|
||||
assertNotNull(statsTraceCtx);
|
||||
|
@ -456,7 +456,7 @@ public class ServerImplTest {
|
|||
verifyNoMoreInteractions(callListener);
|
||||
|
||||
// Check stats
|
||||
CensusTestUtils.MetricsRecord record = censusCtxFactory.pollRecord();
|
||||
StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
|
||||
assertNotNull(record);
|
||||
TagValue methodTag = record.tags.get(RpcConstants.RPC_SERVER_METHOD);
|
||||
assertNotNull(methodTag);
|
||||
|
@ -464,7 +464,7 @@ public class ServerImplTest {
|
|||
TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
|
||||
assertNotNull(statusTag);
|
||||
assertEquals(Status.Code.OK.toString(), statusTag.toString());
|
||||
TagValue extraTag = record.tags.get(CensusTestUtils.EXTRA_TAG);
|
||||
TagValue extraTag = record.tags.get(StatsTestUtils.EXTRA_TAG);
|
||||
assertNotNull(extraTag);
|
||||
assertEquals("extraTagValue", extraTag.toString());
|
||||
assertNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
|
||||
|
@ -542,7 +542,7 @@ public class ServerImplTest {
|
|||
|
||||
ServerImpl server = new ServerImpl(MoreExecutors.directExecutor(), registry, fallbackRegistry,
|
||||
transportServer, SERVER_CONTEXT, decompressorRegistry, compressorRegistry,
|
||||
ImmutableList.of(filter1, filter2), censusCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
ImmutableList.of(filter1, filter2), statsCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
server.start();
|
||||
ServerTransportListener transportListener
|
||||
= transportServer.registerNewServerTransport(new SimpleServerTransport());
|
||||
|
@ -623,7 +623,7 @@ public class ServerImplTest {
|
|||
|
||||
transportServer = new MaybeDeadlockingServer();
|
||||
ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, censusCtxFactory,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
server.start();
|
||||
new Thread() {
|
||||
|
@ -829,7 +829,7 @@ public class ServerImplTest {
|
|||
}
|
||||
};
|
||||
ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, censusCtxFactory,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
server.start();
|
||||
|
||||
|
@ -840,7 +840,7 @@ public class ServerImplTest {
|
|||
public void getPortBeforeStartedFails() {
|
||||
transportServer = new SimpleServer();
|
||||
ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, censusCtxFactory,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
thrown.expect(IllegalStateException.class);
|
||||
thrown.expectMessage("started");
|
||||
|
@ -851,7 +851,7 @@ public class ServerImplTest {
|
|||
public void getPortAfterTerminationFails() throws Exception {
|
||||
transportServer = new SimpleServer();
|
||||
ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, censusCtxFactory,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
server.start();
|
||||
server.shutdown();
|
||||
|
@ -872,7 +872,7 @@ public class ServerImplTest {
|
|||
.build();
|
||||
transportServer = new SimpleServer();
|
||||
ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, censusCtxFactory,
|
||||
SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
server.start();
|
||||
|
||||
|
|
|
@ -46,13 +46,13 @@ import com.google.auth.oauth2.ComputeEngineCredentials;
|
|||
import com.google.auth.oauth2.GoogleCredentials;
|
||||
import com.google.auth.oauth2.OAuth2Credentials;
|
||||
import com.google.auth.oauth2.ServiceAccountCredentials;
|
||||
import com.google.census.CensusContextFactory;
|
||||
import com.google.census.RpcConstants;
|
||||
import com.google.census.TagValue;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.net.HostAndPort;
|
||||
import com.google.instrumentation.stats.RpcConstants;
|
||||
import com.google.instrumentation.stats.StatsContextFactory;
|
||||
import com.google.instrumentation.stats.TagValue;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.EmptyProtos.Empty;
|
||||
import com.google.protobuf.MessageLite;
|
||||
|
@ -71,8 +71,8 @@ import io.grpc.StatusRuntimeException;
|
|||
import io.grpc.auth.MoreCallCredentials;
|
||||
import io.grpc.internal.AbstractServerImplBuilder;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.internal.testing.CensusTestUtils.FakeCensusContextFactory;
|
||||
import io.grpc.internal.testing.CensusTestUtils.MetricsRecord;
|
||||
import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory;
|
||||
import io.grpc.internal.testing.StatsTestUtils.MetricsRecord;
|
||||
import io.grpc.protobuf.ProtoUtils;
|
||||
import io.grpc.stub.MetadataUtils;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
|
@ -130,10 +130,10 @@ public abstract class AbstractInteropTest {
|
|||
new AtomicReference<Metadata>();
|
||||
private static ScheduledExecutorService testServiceExecutor;
|
||||
private static Server server;
|
||||
private static final FakeCensusContextFactory clientCensusFactory =
|
||||
new FakeCensusContextFactory();
|
||||
private static final FakeCensusContextFactory serverCensusFactory =
|
||||
new FakeCensusContextFactory();
|
||||
private static final FakeStatsContextFactory clientStatsFactory =
|
||||
new FakeStatsContextFactory();
|
||||
private static final FakeStatsContextFactory serverStatsFactory =
|
||||
new FakeStatsContextFactory();
|
||||
protected static final Empty EMPTY = Empty.getDefaultInstance();
|
||||
|
||||
protected static void startStaticServer(
|
||||
|
@ -152,7 +152,7 @@ public abstract class AbstractInteropTest {
|
|||
builder.addService(ServerInterceptors.intercept(
|
||||
new TestServiceImpl(testServiceExecutor),
|
||||
allInterceptors));
|
||||
builder.censusContextFactory(serverCensusFactory);
|
||||
builder.statsContextFactory(serverStatsFactory);
|
||||
try {
|
||||
server = builder.build().start();
|
||||
} catch (IOException ex) {
|
||||
|
@ -183,8 +183,8 @@ public abstract class AbstractInteropTest {
|
|||
blockingStub = TestServiceGrpc.newBlockingStub(channel);
|
||||
asyncStub = TestServiceGrpc.newStub(channel);
|
||||
requestHeadersCapture.set(null);
|
||||
clientCensusFactory.rolloverRecords();
|
||||
serverCensusFactory.rolloverRecords();
|
||||
clientStatsFactory.rolloverRecords();
|
||||
serverStatsFactory.rolloverRecords();
|
||||
}
|
||||
|
||||
/** Clean up. */
|
||||
|
@ -197,8 +197,8 @@ public abstract class AbstractInteropTest {
|
|||
|
||||
protected abstract ManagedChannel createChannel();
|
||||
|
||||
protected final CensusContextFactory getClientCensusFactory() {
|
||||
return clientCensusFactory;
|
||||
protected final StatsContextFactory getClientStatsFactory() {
|
||||
return clientStatsFactory;
|
||||
}
|
||||
|
||||
protected boolean metricsExpected() {
|
||||
|
@ -1274,7 +1274,7 @@ public abstract class AbstractInteropTest {
|
|||
|
||||
private void assertClientMetrics(String method, Status.Code status,
|
||||
Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses) {
|
||||
MetricsRecord clientRecord = clientCensusFactory.pollRecord();
|
||||
MetricsRecord clientRecord = clientStatsFactory.pollRecord();
|
||||
assertNotNull("clientRecord is not null", clientRecord);
|
||||
checkTags(clientRecord, false, method, status);
|
||||
if (requests != null && responses != null) {
|
||||
|
@ -1298,7 +1298,7 @@ public abstract class AbstractInteropTest {
|
|||
try {
|
||||
// On the server, the stats is finalized in ServerStreamListener.closed(), which can be run
|
||||
// after the client receives the final status. So we use a timeout.
|
||||
serverRecord = serverCensusFactory.pollRecord(1, TimeUnit.SECONDS);
|
||||
serverRecord = serverStatsFactory.pollRecord(1, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
|
|
@ -542,7 +542,8 @@ public class StressTestClient {
|
|||
|
||||
@Override
|
||||
protected boolean metricsExpected() {
|
||||
// TODO(zhangkun83): we may want to enable the real Census implementation in stress tests.
|
||||
// TODO(zhangkun83): we may want to enable the real google Instrumentation implementation in
|
||||
// stress tests.
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ public class AutoWindowSizingOnTest extends AbstractInteropTest {
|
|||
return NettyChannelBuilder.forAddress("localhost", getPort())
|
||||
.negotiationType(NegotiationType.PLAINTEXT)
|
||||
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
|
||||
.censusContextFactory(getClientCensusFactory())
|
||||
.statsContextFactory(getClientStatsFactory())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -75,7 +75,7 @@ public class Http2NettyLocalChannelTest extends AbstractInteropTest {
|
|||
.channelType(LocalChannel.class)
|
||||
.flowControlWindow(65 * 1024)
|
||||
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
|
||||
.censusContextFactory(getClientCensusFactory())
|
||||
.statsContextFactory(getClientStatsFactory())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ public class Http2NettyTest extends AbstractInteropTest {
|
|||
.ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE)
|
||||
.sslProvider(SslProvider.OPENSSL)
|
||||
.build())
|
||||
.censusContextFactory(getClientCensusFactory())
|
||||
.statsContextFactory(getClientStatsFactory())
|
||||
.build();
|
||||
} catch (Exception ex) {
|
||||
throw new RuntimeException(ex);
|
||||
|
|
|
@ -108,7 +108,7 @@ public class Http2OkHttpTest extends AbstractInteropTest {
|
|||
.cipherSuites(TestUtils.preferredTestCiphers().toArray(new String[0]))
|
||||
.tlsVersions(ConnectionSpec.MODERN_TLS.tlsVersions().toArray(new TlsVersion[0]))
|
||||
.build())
|
||||
.censusContextFactory(getClientCensusFactory())
|
||||
.statsContextFactory(getClientStatsFactory())
|
||||
.overrideAuthority(GrpcUtil.authorityFromHostAndPort(
|
||||
TestUtils.TEST_SERVER_HOST, getPort()));
|
||||
try {
|
||||
|
|
|
@ -60,7 +60,7 @@ public class InProcessTest extends AbstractInteropTest {
|
|||
@Override
|
||||
protected ManagedChannel createChannel() {
|
||||
return InProcessChannelBuilder.forName(SERVER_NAME)
|
||||
.censusContextFactory(getClientCensusFactory()).build();
|
||||
.statsContextFactory(getClientStatsFactory()).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -152,7 +152,7 @@ public class TransportCompressionTest extends AbstractInteropTest {
|
|||
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
|
||||
.decompressorRegistry(decompressors)
|
||||
.compressorRegistry(compressors)
|
||||
.censusContextFactory(getClientCensusFactory())
|
||||
.statsContextFactory(getClientStatsFactory())
|
||||
.intercept(new ClientInterceptor() {
|
||||
@Override
|
||||
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
|
||||
|
|
|
@ -34,31 +34,34 @@ package io.grpc.internal.testing;
|
|||
import static com.google.common.base.Charsets.UTF_8;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
import com.google.census.CensusContext;
|
||||
import com.google.census.CensusContextFactory;
|
||||
import com.google.census.Metric;
|
||||
import com.google.census.MetricMap;
|
||||
import com.google.census.MetricName;
|
||||
import com.google.census.TagKey;
|
||||
import com.google.census.TagValue;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.ByteStreams;
|
||||
import com.google.instrumentation.stats.MeasurementDescriptor;
|
||||
import com.google.instrumentation.stats.MeasurementMap;
|
||||
import com.google.instrumentation.stats.MeasurementValue;
|
||||
import com.google.instrumentation.stats.StatsContext;
|
||||
import com.google.instrumentation.stats.StatsContextFactory;
|
||||
import com.google.instrumentation.stats.TagKey;
|
||||
import com.google.instrumentation.stats.TagValue;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
public class CensusTestUtils {
|
||||
private CensusTestUtils() {
|
||||
public class StatsTestUtils {
|
||||
private StatsTestUtils() {
|
||||
}
|
||||
|
||||
public static class MetricsRecord {
|
||||
public final ImmutableMap<TagKey, TagValue> tags;
|
||||
public final MetricMap metrics;
|
||||
public final MeasurementMap metrics;
|
||||
|
||||
private MetricsRecord(ImmutableMap<TagKey, TagValue> tags, MetricMap metrics) {
|
||||
private MetricsRecord(ImmutableMap<TagKey, TagValue> tags, MeasurementMap metrics) {
|
||||
this.tags = tags;
|
||||
this.metrics = metrics;
|
||||
}
|
||||
|
@ -67,9 +70,9 @@ public class CensusTestUtils {
|
|||
* Returns the value of a metric, or {@code null} if not found.
|
||||
*/
|
||||
@Nullable
|
||||
public Double getMetric(MetricName metricName) {
|
||||
for (Metric m : metrics) {
|
||||
if (m.getName().equals(metricName)) {
|
||||
public Double getMetric(MeasurementDescriptor metricName) {
|
||||
for (MeasurementValue m : metrics) {
|
||||
if (m.getMeasurement().equals(metricName)) {
|
||||
return m.getValue();
|
||||
}
|
||||
}
|
||||
|
@ -79,7 +82,7 @@ public class CensusTestUtils {
|
|||
/**
|
||||
* Returns the value of a metric converted to long, or throw if not found.
|
||||
*/
|
||||
public long getMetricAsLongOrFail(MetricName metricName) {
|
||||
public long getMetricAsLongOrFail(MeasurementDescriptor metricName) {
|
||||
Double doubleValue = getMetric(metricName);
|
||||
checkNotNull(doubleValue, "Metric not found: %s", metricName.toString());
|
||||
long longValue = (long) (Math.abs(doubleValue) + 0.0001);
|
||||
|
@ -90,36 +93,36 @@ public class CensusTestUtils {
|
|||
}
|
||||
}
|
||||
|
||||
public static final TagKey EXTRA_TAG = new TagKey("/rpc/test/extratag");
|
||||
public static final TagKey EXTRA_TAG = TagKey.create("/rpc/test/extratag");
|
||||
|
||||
private static final String EXTRA_TAG_HEADER_VALUE_PREFIX = "extratag:";
|
||||
private static final String NO_EXTRA_TAG_HEADER_VALUE_PREFIX = "noextratag";
|
||||
|
||||
/**
|
||||
* A factory that makes fake {@link CensusContext}s and saves the created contexts to be
|
||||
* A factory that makes fake {@link StatsContext}s and saves the created contexts to be
|
||||
* accessible from {@link #pollContextOrFail}. The contexts it has created would save metrics
|
||||
* records to be accessible from {@link #pollRecord()} and {@link #pollRecord(long, TimeUnit)},
|
||||
* until {@link #rolloverRecords} is called.
|
||||
*/
|
||||
public static final class FakeCensusContextFactory extends CensusContextFactory {
|
||||
public static final class FakeStatsContextFactory extends StatsContextFactory {
|
||||
private BlockingQueue<MetricsRecord> records;
|
||||
public final BlockingQueue<FakeCensusContext> contexts =
|
||||
new LinkedBlockingQueue<FakeCensusContext>();
|
||||
private final FakeCensusContext defaultContext;
|
||||
public final BlockingQueue<FakeStatsContext> contexts =
|
||||
new LinkedBlockingQueue<FakeStatsContext>();
|
||||
private final FakeStatsContext defaultContext;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
public FakeCensusContextFactory() {
|
||||
public FakeStatsContextFactory() {
|
||||
rolloverRecords();
|
||||
defaultContext = new FakeCensusContext(ImmutableMap.<TagKey, TagValue>of(), this);
|
||||
defaultContext = new FakeStatsContext(ImmutableMap.<TagKey, TagValue>of(), this);
|
||||
// The records on the default context is not visible from pollRecord(), just like it's
|
||||
// not visible from pollContextOrFail() either.
|
||||
rolloverRecords();
|
||||
}
|
||||
|
||||
public CensusContext pollContextOrFail() {
|
||||
CensusContext cc = contexts.poll();
|
||||
public StatsContext pollContextOrFail() {
|
||||
StatsContext cc = contexts.poll();
|
||||
return checkNotNull(cc);
|
||||
}
|
||||
|
||||
|
@ -132,11 +135,16 @@ public class CensusTestUtils {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CensusContext deserialize(ByteBuffer buffer) {
|
||||
String serializedString = new String(buffer.array(), UTF_8);
|
||||
public StatsContext deserialize(InputStream buffer) {
|
||||
String serializedString;
|
||||
try {
|
||||
serializedString = new String(ByteStreams.toByteArray(buffer), UTF_8);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
if (serializedString.startsWith(EXTRA_TAG_HEADER_VALUE_PREFIX)) {
|
||||
return getDefault().with(EXTRA_TAG,
|
||||
new TagValue(serializedString.substring(EXTRA_TAG_HEADER_VALUE_PREFIX.length())));
|
||||
TagValue.create(serializedString.substring(EXTRA_TAG_HEADER_VALUE_PREFIX.length())));
|
||||
} else if (serializedString.startsWith(NO_EXTRA_TAG_HEADER_VALUE_PREFIX)) {
|
||||
return getDefault();
|
||||
} else {
|
||||
|
@ -145,7 +153,7 @@ public class CensusTestUtils {
|
|||
}
|
||||
|
||||
@Override
|
||||
public FakeCensusContext getDefault() {
|
||||
public FakeStatsContext getDefault() {
|
||||
return defaultContext;
|
||||
}
|
||||
|
||||
|
@ -164,13 +172,13 @@ public class CensusTestUtils {
|
|||
}
|
||||
}
|
||||
|
||||
public static class FakeCensusContext extends CensusContext {
|
||||
public static class FakeStatsContext extends StatsContext {
|
||||
private final ImmutableMap<TagKey, TagValue> tags;
|
||||
private final FakeCensusContextFactory factory;
|
||||
private final FakeStatsContextFactory factory;
|
||||
private final BlockingQueue<MetricsRecord> recordSink;
|
||||
|
||||
private FakeCensusContext(ImmutableMap<TagKey, TagValue> tags,
|
||||
FakeCensusContextFactory factory) {
|
||||
private FakeStatsContext(ImmutableMap<TagKey, TagValue> tags,
|
||||
FakeStatsContextFactory factory) {
|
||||
this.tags = tags;
|
||||
this.factory = factory;
|
||||
this.recordSink = factory.getCurrentRecordSink();
|
||||
|
@ -178,23 +186,26 @@ public class CensusTestUtils {
|
|||
|
||||
@Override
|
||||
public Builder builder() {
|
||||
return new FakeCensusContextBuilder(this);
|
||||
return new FakeStatsContextBuilder(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CensusContext record(MetricMap metrics) {
|
||||
public StatsContext record(MeasurementMap metrics) {
|
||||
recordSink.add(new MetricsRecord(tags, metrics));
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer serialize() {
|
||||
public void serialize(OutputStream os) {
|
||||
TagValue extraTagValue = tags.get(EXTRA_TAG);
|
||||
if (extraTagValue == null) {
|
||||
return ByteBuffer.wrap(NO_EXTRA_TAG_HEADER_VALUE_PREFIX.getBytes(UTF_8));
|
||||
} else {
|
||||
return ByteBuffer.wrap(
|
||||
(EXTRA_TAG_HEADER_VALUE_PREFIX + extraTagValue.toString()).getBytes(UTF_8));
|
||||
try {
|
||||
if (extraTagValue == null) {
|
||||
os.write(NO_EXTRA_TAG_HEADER_VALUE_PREFIX.getBytes(UTF_8));
|
||||
} else {
|
||||
os.write((EXTRA_TAG_HEADER_VALUE_PREFIX + extraTagValue.toString()).getBytes(UTF_8));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -205,10 +216,10 @@ public class CensusTestUtils {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (!(other instanceof FakeCensusContext)) {
|
||||
if (!(other instanceof FakeStatsContext)) {
|
||||
return false;
|
||||
}
|
||||
FakeCensusContext otherCtx = (FakeCensusContext) other;
|
||||
FakeStatsContext otherCtx = (FakeStatsContext) other;
|
||||
return tags.equals(otherCtx.tags);
|
||||
}
|
||||
|
||||
|
@ -218,24 +229,24 @@ public class CensusTestUtils {
|
|||
}
|
||||
}
|
||||
|
||||
private static class FakeCensusContextBuilder extends CensusContext.Builder {
|
||||
private static class FakeStatsContextBuilder extends StatsContext.Builder {
|
||||
private final ImmutableMap.Builder<TagKey, TagValue> tagsBuilder = ImmutableMap.builder();
|
||||
private final FakeCensusContext base;
|
||||
private final FakeStatsContext base;
|
||||
|
||||
private FakeCensusContextBuilder(FakeCensusContext base) {
|
||||
private FakeStatsContextBuilder(FakeStatsContext base) {
|
||||
this.base = base;
|
||||
tagsBuilder.putAll(base.tags);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CensusContext.Builder set(TagKey key, TagValue value) {
|
||||
public StatsContext.Builder set(TagKey key, TagValue value) {
|
||||
tagsBuilder.put(key, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CensusContext build() {
|
||||
FakeCensusContext context = new FakeCensusContext(tagsBuilder.build(), base.factory);
|
||||
public StatsContext build() {
|
||||
FakeStatsContext context = new FakeStatsContext(tagsBuilder.build(), base.factory);
|
||||
base.factory.contexts.add(context);
|
||||
return context;
|
||||
}
|
Loading…
Reference in New Issue