Revert "core, netty: add io.perfmark Annotations" (#5853)

This causes internal breakage which needs to be resolved before continuing.

This reverts commit 71967622d6.
This commit is contained in:
Carl Mastrangelo 2019-06-07 17:23:49 -07:00 committed by GitHub
parent 44ecdf3649
commit 2db3abc9ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 615 additions and 428 deletions

View File

@ -201,7 +201,6 @@ subprojects {
opencensus_impl: "io.opencensus:opencensus-impl:${opencensusVersion}",
opencensus_impl_lite: "io.opencensus:opencensus-impl-lite:${opencensusVersion}",
instrumentation_api: 'com.google.instrumentation:instrumentation-api:0.4.3',
perfmark: 'io.perfmark:perfmark-api:0.16.0',
protobuf: "com.google.protobuf:protobuf-java:${protobufVersion}",
protobuf_lite: "com.google.protobuf:protobuf-lite:3.0.1",
protoc_lite: "com.google.protobuf:protoc-gen-javalite:3.0.0",

View File

@ -1,3 +1,16 @@
PERFMARK_INTERNAL_ACCESSOR_SRCS = glob(
[
"src/main/java/io/grpc/perfmark/Internal*.java",
],
)
PERFMARK_SRCS = glob(
[
"src/main/java/io/grpc/perfmark/*.java",
],
exclude = PERFMARK_INTERNAL_ACCESSOR_SRCS,
)
java_library(
name = "core",
visibility = ["//visibility:public"],
@ -30,6 +43,7 @@ java_library(
]),
visibility = ["//:__subpackages__"],
deps = [
":perfmark",
"//api",
"//context",
"@com_google_android_annotations//jar",
@ -40,7 +54,6 @@ java_library(
"@com_google_j2objc_j2objc_annotations//jar",
"@io_opencensus_opencensus_api//jar",
"@io_opencensus_opencensus_contrib_grpc_metrics//jar",
"@io_perfmark_perfmark_api//jar",
"@org_codehaus_mojo_animal_sniffer_annotations//jar",
],
)
@ -63,3 +76,12 @@ java_library(
],
)
java_library(
name = "perfmark",
srcs = PERFMARK_SRCS,
visibility = ["//:__subpackages__"],
deps = [
"@com_google_code_findbugs_jsr305//jar",
"@com_google_errorprone_error_prone_annotations//jar",
],
)

View File

@ -4,7 +4,6 @@ dependencies {
compile project(':grpc-api'),
libraries.gson,
libraries.android_annotations,
libraries.perfmark
compile (libraries.opencensus_api) {
// prefer 3.0.2 from libraries instead of 3.0.1
exclude group: 'com.google.code.findbugs', module: 'jsr305'

View File

@ -46,9 +46,8 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.perfmark.Link;
import io.perfmark.PerfMark;
import io.perfmark.Tag;
import io.grpc.perfmark.PerfMark;
import io.grpc.perfmark.PerfTag;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.concurrent.CancellationException;
@ -70,7 +69,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
= "gzip".getBytes(Charset.forName("US-ASCII"));
private final MethodDescriptor<ReqT, RespT> method;
private final Tag tag;
private final PerfTag tag;
private final Executor callExecutor;
private final CallTracer channelCallsTracer;
private final Context context;
@ -97,7 +96,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
boolean retryEnabled) {
this.method = method;
// TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl.
this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this));
this.tag = PerfMark.createTag(method.getFullMethodName());
// If we know that the executor is a direct executor, we don't need to wrap it with a
// SerializingExecutor. This is purely for performance reasons.
// See https://github.com/grpc/grpc-java/issues/368
@ -113,7 +112,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
this.clientTransportProvider = clientTransportProvider;
this.deadlineCancellationExecutor = deadlineCancellationExecutor;
this.retryEnabled = retryEnabled;
PerfMark.event("ClientCall.<init>", tag);
}
private final class ContextCancellationListener implements CancellationListener {
@ -185,11 +183,11 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void start(Listener<RespT> observer, Metadata headers) {
PerfMark.startTask("ClientCall.start", tag);
PerfMark.taskStart(tag, "ClientCall.start");
try {
startInternal(observer, headers);
} finally {
PerfMark.stopTask("ClientCall.start", tag);
PerfMark.taskEnd(tag, "ClientCall.start");
}
}
@ -380,23 +378,18 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void request(int numMessages) {
PerfMark.startTask("ClientCall.request", tag);
try {
checkState(stream != null, "Not started");
checkArgument(numMessages >= 0, "Number requested must be non-negative");
stream.request(numMessages);
} finally {
PerfMark.stopTask("ClientCall.cancel", tag);
}
checkState(stream != null, "Not started");
checkArgument(numMessages >= 0, "Number requested must be non-negative");
stream.request(numMessages);
}
@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
PerfMark.startTask("ClientCall.cancel", tag);
PerfMark.taskStart(tag, "ClientCall.cancel");
try {
cancelInternal(message, cause);
} finally {
PerfMark.stopTask("ClientCall.cancel", tag);
PerfMark.taskEnd(tag, "ClientCall.cancel");
}
}
@ -431,11 +424,11 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void halfClose() {
PerfMark.startTask("ClientCall.halfClose", tag);
PerfMark.taskStart(tag, "ClientCall.halfClose");
try {
halfCloseInternal();
} finally {
PerfMark.stopTask("ClientCall.halfClose", tag);
PerfMark.taskEnd(tag, "ClientCall.halfClose");
}
}
@ -449,11 +442,11 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void sendMessage(ReqT message) {
PerfMark.startTask("ClientCall.sendMessage", tag);
PerfMark.taskStart(tag, "ClientCall.sendMessage");
try {
sendMessageInternal(message);
} finally {
PerfMark.stopTask("ClientCall.sendMessage", tag);
PerfMark.taskEnd(tag, "ClientCall.sendMessage");
}
}
@ -522,29 +515,17 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void headersRead(final Metadata headers) {
PerfMark.startTask("ClientStreamListener.headersRead", tag);
final Link link = PerfMark.link();
final class HeadersRead extends ContextRunnable {
HeadersRead() {
super(context);
}
@Override
public void runInContext() {
PerfMark.startTask("ClientCall$Listener.headersRead", tag);
link.link();
try {
runInternal();
} finally {
PerfMark.stopTask("ClientCall$Listener.headersRead", tag);
}
}
private void runInternal() {
public final void runInContext() {
if (closed) {
return;
}
PerfMark.taskStart(tag, "ClientCall.headersRead");
try {
observer.onHeaders(headers);
} catch (Throwable t) {
@ -552,43 +533,29 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
Status.CANCELLED.withCause(t).withDescription("Failed to read headers");
stream.cancel(status);
close(status, new Metadata());
} finally {
PerfMark.taskEnd(tag, "ClientCall.headersRead");
}
}
}
try {
callExecutor.execute(new HeadersRead());
} finally {
PerfMark.stopTask("ClientStreamListener.headersRead", tag);
}
callExecutor.execute(new HeadersRead());
}
@Override
public void messagesAvailable(final MessageProducer producer) {
PerfMark.startTask("ClientStreamListener.messagesAvailable", tag);
final Link link = PerfMark.link();
final class MessagesAvailable extends ContextRunnable {
MessagesAvailable() {
super(context);
}
@Override
public void runInContext() {
PerfMark.startTask("ClientCall$Listener.messagesAvailable", tag);
link.link();
try {
runInternal();
} finally {
PerfMark.stopTask("ClientCall$Listener.messagesAvailable", tag);
}
}
private void runInternal() {
public final void runInContext() {
if (closed) {
GrpcUtil.closeQuietly(producer);
return;
}
PerfMark.taskStart(tag, "ClientCall.messagesAvailable");
try {
InputStream message;
while ((message = producer.next()) != null) {
@ -606,15 +573,13 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
Status.CANCELLED.withCause(t).withDescription("Failed to read message.");
stream.cancel(status);
close(status, new Metadata());
} finally {
PerfMark.taskEnd(tag, "ClientCall.messagesAvailable");
}
}
}
try {
callExecutor.execute(new MessagesAvailable());
} finally {
PerfMark.stopTask("ClientStreamListener.messagesAvailable", tag);
}
callExecutor.execute(new MessagesAvailable());
}
/**
@ -638,16 +603,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
PerfMark.startTask("ClientStreamListener.closed", tag);
try {
closedInternal(status, rpcProgress, trailers);
} finally {
PerfMark.stopTask("ClientStreamListener.closed", tag);
}
}
private void closedInternal(
Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers) {
Deadline deadline = effectiveDeadline();
if (status.getCode() == Status.Code.CANCELLED && deadline != null) {
// When the server's deadline expires, it can only reset the stream with CANCEL and no
@ -661,29 +616,23 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
}
final Status savedStatus = status;
final Metadata savedTrailers = trailers;
final Link link = PerfMark.link();
final class StreamClosed extends ContextRunnable {
StreamClosed() {
super(context);
}
@Override
public void runInContext() {
PerfMark.startTask("ClientCall$Listener.onClose", tag);
link.link();
try {
runInternal();
} finally {
PerfMark.stopTask("ClientCall$Listener.onClose", tag);
}
}
private void runInternal() {
public final void runInContext() {
if (closed) {
// We intentionally don't keep the status or metadata from the server.
return;
}
close(savedStatus, savedTrailers);
PerfMark.taskStart(tag, "ClientCall.closed");
try {
close(savedStatus, savedTrailers);
} finally {
PerfMark.taskEnd(tag, "ClientCall.closed");
}
}
}
@ -692,26 +641,14 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void onReady() {
PerfMark.startTask("ClientStreamListener.onReady", tag);
final Link link = PerfMark.link();
final class StreamOnReady extends ContextRunnable {
StreamOnReady() {
super(context);
}
@Override
public void runInContext() {
PerfMark.startTask("ClientCall$Listener.onReady", tag);
link.link();
try {
runInternal();
} finally {
PerfMark.stopTask("ClientCall$Listener.onReady", tag);
}
}
private void runInternal() {
public final void runInContext() {
PerfMark.taskStart(tag, "ClientCall.onReady");
try {
observer.onReady();
} catch (Throwable t) {
@ -719,15 +656,13 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
Status.CANCELLED.withCause(t).withDescription("Failed to call onReady.");
stream.cancel(status);
close(status, new Metadata());
} finally {
PerfMark.taskEnd(tag, "ClientCall.onReady");
}
}
}
try {
callExecutor.execute(new StreamOnReady());
} finally {
PerfMark.stopTask("ClientStreamListener.onReady", tag);
}
callExecutor.execute(new StreamOnReady());
}
}
}

View File

@ -37,8 +37,8 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.Status;
import io.perfmark.PerfMark;
import io.perfmark.Tag;
import io.grpc.perfmark.PerfMark;
import io.grpc.perfmark.PerfTag;
import java.io.InputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -54,7 +54,7 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
private final ServerStream stream;
private final MethodDescriptor<ReqT, RespT> method;
private final Tag tag;
private final PerfTag tag;
private final Context.CancellableContext context;
private final byte[] messageAcceptEncoding;
private final DecompressorRegistry decompressorRegistry;
@ -71,35 +71,31 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method,
Metadata inboundHeaders, Context.CancellableContext context,
DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
CallTracer serverCallTracer, Tag tag) {
CallTracer serverCallTracer) {
this.stream = stream;
this.method = method;
// TODO(carl-mastrangelo): consider moving this to the ServerImpl to record startCall.
this.tag = PerfMark.createTag(method.getFullMethodName());
this.context = context;
this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY);
this.decompressorRegistry = decompressorRegistry;
this.compressorRegistry = compressorRegistry;
this.serverCallTracer = serverCallTracer;
this.serverCallTracer.reportCallStarted();
this.tag = tag;
}
@Override
public void request(int numMessages) {
PerfMark.startTask("ServerCall.request", tag);
try {
stream.request(numMessages);
} finally {
PerfMark.stopTask("ServerCall.request", tag);
}
stream.request(numMessages);
}
@Override
public void sendHeaders(Metadata headers) {
PerfMark.startTask("ServerCall.sendHeaders", tag);
PerfMark.taskStart(tag, "ServerCall.sendHeaders");
try {
sendHeadersInternal(headers);
} finally {
PerfMark.stopTask("ServerCall.sendHeaders", tag);
PerfMark.taskEnd(tag, "ServerCall.sendHeaders");
}
}
@ -144,11 +140,11 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@Override
public void sendMessage(RespT message) {
PerfMark.startTask("ServerCall.sendMessage", tag);
PerfMark.taskStart(tag, "ServerCall.sendMessage");
try {
sendMessageInternal(message);
} finally {
PerfMark.stopTask("ServerCall.sendMessage", tag);
PerfMark.taskEnd(tag, "ServerCall.sendMessage");
}
}
@ -197,11 +193,11 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@Override
public void close(Status status, Metadata trailers) {
PerfMark.startTask("ServerCall.close", tag);
PerfMark.taskStart(tag, "ServerCall.close");
try {
closeInternal(status, trailers);
} finally {
PerfMark.stopTask("ServerCall.close", tag);
PerfMark.taskEnd(tag, "ServerCall.close");
}
}
@ -285,23 +281,15 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
MoreExecutors.directExecutor());
}
@Override
public void messagesAvailable(MessageProducer producer) {
PerfMark.startTask("ServerStreamListener.messagesAvailable", call.tag);
try {
messagesAvailableInternal(producer);
} finally {
PerfMark.stopTask("ServerStreamListener.messagesAvailable", call.tag);
}
}
@SuppressWarnings("Finally") // The code avoids suppressing the exception thrown from try
private void messagesAvailableInternal(final MessageProducer producer) {
@Override
public void messagesAvailable(final MessageProducer producer) {
if (call.cancelled) {
GrpcUtil.closeQuietly(producer);
return;
}
PerfMark.taskStart(call.tag, "ServerCall.messagesAvailable");
InputStream message;
try {
while ((message = producer.next()) != null) {
@ -317,58 +305,58 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
GrpcUtil.closeQuietly(producer);
Throwables.throwIfUnchecked(t);
throw new RuntimeException(t);
} finally {
PerfMark.taskEnd(call.tag, "ServerCall.messagesAvailable");
}
}
@Override
public void halfClosed() {
PerfMark.startTask("ServerStreamListener.halfClosed", call.tag);
try {
if (call.cancelled) {
return;
}
if (call.cancelled) {
return;
}
PerfMark.taskStart(call.tag, "ServerCall.halfClosed");
try {
listener.onHalfClose();
} finally {
PerfMark.stopTask("ServerStreamListener.halfClosed", call.tag);
PerfMark.taskEnd(call.tag, "ServerCall.halfClosed");
}
}
@Override
public void closed(Status status) {
PerfMark.startTask("ServerStreamListener.closed", call.tag);
PerfMark.taskStart(call.tag, "ServerCall.closed");
try {
closedInternal(status);
} finally {
PerfMark.stopTask("ServerStreamListener.closed", call.tag);
}
}
try {
if (status.isOk()) {
listener.onComplete();
} else {
call.cancelled = true;
listener.onCancel();
}
} finally {
// Cancel context after delivering RPC closure notification to allow the application to
// clean up and update any state based on whether onComplete or onCancel was called.
context.cancel(null);
private void closedInternal(Status status) {
try {
if (status.isOk()) {
listener.onComplete();
} else {
call.cancelled = true;
listener.onCancel();
}
} finally {
// Cancel context after delivering RPC closure notification to allow the application to
// clean up and update any state based on whether onComplete or onCancel was called.
context.cancel(null);
PerfMark.taskEnd(call.tag, "ServerCall.closed");
}
}
@Override
public void onReady() {
PerfMark.startTask("ServerStreamListener.onReady", call.tag);
if (call.cancelled) {
return;
}
PerfMark.taskStart(call.tag, "ServerCall.closed");
try {
if (call.cancelled) {
return;
}
listener.onReady();
} finally {
PerfMark.stopTask("ServerCall.closed", call.tag);
PerfMark.taskEnd(call.tag, "ServerCall.closed");
}
}
}

View File

@ -50,9 +50,6 @@ import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerTransportFilter;
import io.grpc.Status;
import io.perfmark.Link;
import io.perfmark.PerfMark;
import io.perfmark.Tag;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
@ -463,21 +460,9 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
transportClosed(transport);
}
@Override
public void streamCreated(ServerStream stream, String methodName, Metadata headers) {
Tag tag = PerfMark.createTag(methodName, stream.hashCode());
PerfMark.startTask("ServerTransportListener.streamCreated", tag);
try {
streamCreatedInternal(stream, methodName, headers, tag);
} finally {
PerfMark.stopTask("ServerTransportListener.streamCreated", tag);
}
}
private void streamCreatedInternal(
final ServerStream stream, final String methodName, final Metadata headers, final Tag tag) {
public void streamCreated(
final ServerStream stream, final String methodName, final Metadata headers) {
if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
String encoding = headers.get(MESSAGE_ENCODING_KEY);
Decompressor decompressor = decompressorRegistry.lookupDecompressor(encoding);
@ -504,33 +489,22 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
wrappedExecutor = new SerializingExecutor(executor);
}
final Link link = PerfMark.link();
final JumpToApplicationThreadServerStreamListener jumpListener
= new JumpToApplicationThreadServerStreamListener(
wrappedExecutor, executor, stream, context, tag);
wrappedExecutor, executor, stream, context);
stream.setListener(jumpListener);
// Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks
// are delivered, including any errors. Callbacks can still be triggered, but they will be
// queued.
final class StreamCreated extends ContextRunnable {
StreamCreated() {
super(context);
}
@Override
public void runInContext() {
PerfMark.startTask("ServerTransportListener$StreamCreated.startCall", tag);
link.link();
try {
runInternal();
} finally {
PerfMark.stopTask("ServerTransportListener$StreamCreated.startCall", tag);
}
}
private void runInternal() {
ServerStreamListener listener = NOOP_LISTENER;
try {
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
@ -549,8 +523,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
context.cancel(null);
return;
}
listener =
startCall(stream, methodName, method, headers, context, statsTraceCtx, tag);
listener = startCall(stream, methodName, method, headers, context, statsTraceCtx);
} catch (RuntimeException e) {
stream.close(Status.fromThrowable(e), new Metadata());
context.cancel(null);
@ -600,7 +573,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
/** Never returns {@code null}. */
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers,
Context.CancellableContext context, StatsTraceContext statsTraceCtx, Tag tag) {
Context.CancellableContext context, StatsTraceContext statsTraceCtx) {
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
statsTraceCtx.serverCallStarted(
new ServerCallInfoImpl<>(
@ -614,7 +587,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
ServerMethodDefinition<?, ?> wMethodDef = binlog == null
? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);
return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context, tag);
return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context);
}
private <WReqT, WRespT> ServerStreamListener startWrappedCall(
@ -622,8 +595,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
ServerMethodDefinition<WReqT, WRespT> methodDef,
ServerStream stream,
Metadata headers,
Context.CancellableContext context,
Tag tag) {
Context.CancellableContext context) {
ServerCallImpl<WReqT, WRespT> call = new ServerCallImpl<>(
stream,
@ -632,8 +604,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
context,
decompressorRegistry,
compressorRegistry,
serverCallTracer,
tag);
serverCallTracer);
ServerCall.Listener<WReqT> listener =
methodDef.getServerCallHandler().startCall(call, headers);
@ -716,17 +687,15 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
private final Executor cancelExecutor;
private final Context.CancellableContext context;
private final ServerStream stream;
private final Tag tag;
// Only accessed from callExecutor.
private ServerStreamListener listener;
public JumpToApplicationThreadServerStreamListener(Executor executor,
Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) {
Executor cancelExecutor, ServerStream stream, Context.CancellableContext context) {
this.callExecutor = executor;
this.cancelExecutor = cancelExecutor;
this.stream = stream;
this.context = context;
this.tag = tag;
}
/**
@ -756,8 +725,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override
public void messagesAvailable(final MessageProducer producer) {
PerfMark.startTask("ServerStreamListener.messagesAvailable", tag);
final Link link = PerfMark.link();
final class MessagesAvailable extends ContextRunnable {
@ -767,8 +734,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override
public void runInContext() {
PerfMark.startTask("ServerCallListener(app).messagesAvailable", tag);
link.link();
try {
getListener().messagesAvailable(producer);
} catch (RuntimeException e) {
@ -777,24 +742,15 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
} catch (Error e) {
internalClose();
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).messagesAvailable", tag);
}
}
}
try {
callExecutor.execute(new MessagesAvailable());
} finally {
PerfMark.stopTask("ServerStreamListener.messagesAvailable", tag);
}
callExecutor.execute(new MessagesAvailable());
}
@Override
public void halfClosed() {
PerfMark.startTask("ServerStreamListener.halfClosed", tag);
final Link link = PerfMark.link();
final class HalfClosed extends ContextRunnable {
HalfClosed() {
super(context);
@ -802,8 +758,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override
public void runInContext() {
PerfMark.startTask("ServerCallListener(app).halfClosed", tag);
link.link();
try {
getListener().halfClosed();
} catch (RuntimeException e) {
@ -812,30 +766,15 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
} catch (Error e) {
internalClose();
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).halfClosed", tag);
}
}
}
try {
callExecutor.execute(new HalfClosed());
} finally {
PerfMark.stopTask("ServerStreamListener.halfClosed", tag);
}
callExecutor.execute(new HalfClosed());
}
@Override
public void closed(final Status status) {
PerfMark.startTask("ServerStreamListener.closed", tag);
try {
closedInternal(status);
} finally {
PerfMark.stopTask("ServerStreamListener.closed", tag);
}
}
private void closedInternal(final Status status) {
// For cancellations, promptly inform any users of the context that their work should be
// aborted. Otherwise, we can wait until pending work is done.
if (!status.isOk()) {
@ -843,7 +782,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
// is not serializing.
cancelExecutor.execute(new ContextCloser(context, status.getCause()));
}
final Link link = PerfMark.link();
final class Closed extends ContextRunnable {
Closed() {
@ -852,13 +790,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override
public void runInContext() {
PerfMark.startTask("ServerCallListener(app).closed", tag);
link.link();
try {
getListener().closed(status);
} finally {
PerfMark.stopTask("ServerCallListener(app).closed", tag);
}
getListener().closed(status);
}
}
@ -867,8 +799,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override
public void onReady() {
PerfMark.startTask("ServerStreamListener.onReady", tag);
final Link link = PerfMark.link();
final class OnReady extends ContextRunnable {
OnReady() {
super(context);
@ -876,8 +806,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override
public void runInContext() {
PerfMark.startTask("ServerCallListener(app).onReady", tag);
link.link();
try {
getListener().onReady();
} catch (RuntimeException e) {
@ -886,17 +814,11 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
} catch (Error e) {
internalClose();
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).onReady", tag);
}
}
}
try {
callExecutor.execute(new OnReady());
} finally {
PerfMark.stopTask("ServerStreamListener.onReady", tag);
}
callExecutor.execute(new OnReady());
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.perfmark;
/**
* Internal {@link PerfTag.TagFactory} and {@link PerfMarkTask} accessor. This is intended for use
* by io.grpc.perfmark, and the specifically supported packages that utilize PerfMark. If you
* *really* think you need to use this, contact the gRPC team first.
*/
public final class InternalPerfMark {
private InternalPerfMark() {}
/** Expose class to allow packages that utilize PerfMark to get PerfMarkTask instances. */
public abstract static class InternalPerfMarkTask extends PerfMarkTask {
public InternalPerfMarkTask() {}
}
/** Expose methods that create PerfTag to packages that utilize PerfMark. */
private static final long NULL_NUMERIC_TAG = 0;
private static final String NULL_STRING_TAG = null;
public static PerfTag createPerfTag(long numericTag, String stringTag) {
return PerfTag.TagFactory.create(numericTag, stringTag);
}
public static PerfTag createPerfTag(String stringTag) {
return PerfTag.TagFactory.create(NULL_NUMERIC_TAG, stringTag);
}
public static PerfTag createPerfTag(long numericTag) {
return PerfTag.TagFactory.create(numericTag, NULL_STRING_TAG);
}
}

View File

@ -0,0 +1,171 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.perfmark;
import com.google.errorprone.annotations.CompileTimeConstant;
import io.grpc.perfmark.PerfTag.TagFactory;
/**
* PerfMark is a collection of stub methods for marking key points in the RPC lifecycle. This
* class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this yet.
*/
public final class PerfMark {
private PerfMark() {
throw new AssertionError("nope");
}
/**
* Start a Task with a Tag to identify it; a task represents some work that spans some time, and
* you are interested in both its start time and end time.
*
* @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't
* use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task
* does not have a numeric tag associated. In this case, you are encouraged to use {@link
* #taskStart(String)} or {@link PerfTag#create(String)}.
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
* Otherwise, instrumentation result will show "(invalid name)" for this task.
*/
public static void taskStart(PerfTag tag, @CompileTimeConstant String taskName) {}
/**
* Start a Task; a task represents some work that spans some time, and you are interested in both
* its start time and end time.
*
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
* Otherwise, instrumentation result will show "(invalid name)" for this task.
*/
public static void taskStart(@CompileTimeConstant String taskName) {}
/**
* End a Task with a Tag to identify it; a task represents some work that spans some time, and
* you are interested in both its start time and end time.
*
* @param tag a Tag object associated with the task start. This should be the tag used for the
* corresponding {@link #taskStart(PerfTag, String)} call.
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
* Otherwise, instrumentation result will show "(invalid name)" for this task. This should
* be the name used by the corresponding {@link #taskStart(PerfTag, String)} call.
*/
public static void taskEnd(PerfTag tag, @CompileTimeConstant String taskName) {}
/**
* End a Task with a Tag to identify it; a task represents some work that spans some time, and
* you are interested in both its start time and end time.
*
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
* Otherwise, instrumentation result will show "(invalid name)" for this task. This should
* be the name used by the corresponding {@link #taskStart(String)} call.
*/
public static void taskEnd(@CompileTimeConstant String taskName) {}
/**
* Start a Task with a Tag to identify it in a try-with-resource statement; a task represents some
* work that spans some time, and you are interested in both its start time and end time.
*
* <p>Use this in a try-with-resource statement so that task will end automatically.
*
* @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't
* use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task
* does not have a numeric tag associated. In this case, you are encouraged to use {@link
* #task(String)} or {@link PerfTag#create(String)}.
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
* Otherwise, instrumentation result will show "(invalid name)" for this task.
*/
public static PerfMarkTask task(PerfTag tag, @CompileTimeConstant String taskName) {
return NoopTask.INSTANCE;
}
/**
* Start a Task it in a try-with-resource statement; a task represents some work that spans some
* time, and you are interested in both its start time and end time.
*
* <p>Use this in a try-with-resource statement so that task will end automatically.
*
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
* Otherwise, instrumentation result will show "(invalid name)" for this task.
*/
public static PerfMarkTask task(@CompileTimeConstant String taskName) {
return NoopTask.INSTANCE;
}
/**
* Records an Event with a Tag to identify it.
*
* <p>An Event is different from a Task in that you don't care how much time it spanned. You are
* interested in only the time it happened.
*
* @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't
* use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task
* does not have a numeric tag associated. In this case, you are encouraged to use {@link
* #event(String)} or {@link PerfTag#create(String)}.
* @param eventName The name of the event. <b>This parameter must be a compile-time constant!</b>
* Otherwise, instrumentation result will show "(invalid name)" for this event.
*/
public static void event(PerfTag tag, @CompileTimeConstant String eventName) {}
/**
* Records an Event.
*
* <p>An Event is different from a Task in that you don't care how much time it spanned. You are
* interested in only the time it happened.
*
* @param eventName The name of the event. <b>This parameter must be a compile-time constant!</b>
* Otherwise, instrumentation result will show "(invalid name)" for this event.
*/
public static void event(@CompileTimeConstant String eventName) {}
/**
* If PerfMark instrumentation is not enabled, returns a Tag with numericTag = 0L. Replacement
* for {@link TagFactory#create(long, String)} if PerfMark agent is enabled.
*
*/
public static PerfTag createTag(
@SuppressWarnings("unused") long numericTag, @SuppressWarnings("unused") String stringTag) {
// Warning suppression is safe as this method returns by default the NULL_PERF_TAG
return NULL_PERF_TAG;
}
/**
* If PerfMark instrumentation is not enabled returns a Tag with numericTag = 0L. Replacement
* for {@link TagFactory#create(String)} if PerfMark agent is enabled.
*/
public static PerfTag createTag(@SuppressWarnings("unused") String stringTag) {
// Warning suppression is safe as this method returns by default the NULL_PERF_TAG
return NULL_PERF_TAG;
}
/**
* If PerfMark instrumentation is not enabled returns a Tag with numericTag = 0L. Replacement
* for {@link TagFactory#create(long)} if PerfMark agent is enabled.
*/
public static PerfTag createTag(@SuppressWarnings("unused") long numericTag) {
// Warning suppression is safe as this method returns by default the NULL_PERF_TAG
return NULL_PERF_TAG;
}
private static final PerfTag NULL_PERF_TAG = TagFactory.create();
private static final class NoopTask extends PerfMarkTask {
private static final PerfMarkTask INSTANCE = new NoopTask();
NoopTask() {}
@Override
public void close() {}
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.perfmark;
import java.io.Closeable;
/**
* This class exists to make it easier for users to use the try-with-resource shorthand for
* starting and ending a PerfMark Task. This class is {@link io.grpc.Internal} and
* {@link io.grpc.ExperimentalApi}. Do not use this yet.
*/
public abstract class PerfMarkTask implements Closeable {
@Override
public abstract void close();
PerfMarkTask() {}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.perfmark;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Annotation to add PerfMark instrumentation points surrounding method invocation.
*
* <p>This class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this
* yet.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
// TODO(carl-mastrangelo): Add this line back in and make it okay on Android
//@IncompatibleModifiers(value = {Modifier.ABSTRACT, Modifier.NATIVE})
public @interface PerfMarker {
/**
* The name of the task; e.g. `parseHeaders`.
*/
String taskName();
/**
* An optional computed tag.
*
* <p>There are 3 supported references that can be used
* <ul>
* <li>{@code "this"}: Then the tag will be the {@link Object#toString} of the current class.
* Only valid for instance methods.
* <li>{@code "someFieldName"}: Then the tag will be the result of
* calling {@link String#valueOf(Object)} on the field. The field cannot be a primitive or
* and array type. (Though we may revisit this in the future).
* <li>{@code "$N"}: Then the tag will be the result of calling {@link String#valueOf(Object)}
* on the Nth method parameter. Parameters are {@code 0} indexed so {@code "$1"} is the
* second parameter. The referenced parameter cannot be a primitive or an array type.
* (Though we may revisit this in the future).
* </ul>
*
* <p>In general you should reference either {@code "this"} or {@code final} fields since
* in these cases we can cache the operations to decrease the cost of computing the tags. A side
* effect of this is that for such references we will not have their tags recalculated after the
* first time. Thus it is best to use immutable objects for tags.
*/
String computedTag() default "";
/**
* True if class with annotation is immutable and instrumentation must adhere to this restriction.
* If enableSampling is passed as argument to the agent, instrumentation points with <code>
* immutable = true </code> are ignored.
*/
boolean immutable() default false;
}

View File

@ -0,0 +1,118 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.perfmark;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
/**
* A Tag is used to provide additional information to identify a task and consists of a 64-bit
* integer value and a string.
*
* <p>Both the {@code numericTag} and the {@code stringTag} are optional. The {@code numericTag}
* value can be used to identify the specific task being worked on (e.g. the id of the rpc call).
* The {@code stringTag} can be used to store any value that is not a compile-time constant (a
* restriction imposed for the name passed to PerfMark tasks and events). A value of 0 for the
* {@code numericTag} is considered null. Don't use 0 for the {@code numericTag} unless you intend
* to specify null. In that case you are encouraged to use {@link #create(String)}.
*
* <p>Invocations to {@code create} methods in this class are a no-op unless PerfMark
* instrumentation is enabled. If so, calls to {@code create} methods to this class are replaced for
* calls to {@link TagFactory} create methods.
*
* <p>This class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this
* yet.
*/
@Immutable
public final class PerfTag {
private static final long NULL_NUMERIC_TAG = 0;
private static final String NULL_STRING_TAG = null;
private final long numericTag;
private final String stringTag;
private PerfTag(long numericTag, @Nullable String stringTag) {
this.numericTag = numericTag;
this.stringTag = stringTag;
}
/** Returns the numeric tag if set, or {@link #NULL_NUMERIC_TAG} instead. */
public long getNumericTag() {
return numericTag;
}
/** Returns the string tag if set, or {@link #NULL_STRING_TAG} instead. */
@Nullable public String getStringTag() {
return stringTag;
}
@Override
public String toString() {
return "Tag(numericTag=" + numericTag + ",stringTag='" + stringTag + "')";
}
@Override
public int hashCode() {
int longHashCode = (int)(numericTag ^ (numericTag >>> 32));
return longHashCode + (stringTag != null ? stringTag.hashCode() : 31);
}
@Override
@SuppressWarnings("ReferenceEquality") // No Java 8 yet.
public boolean equals(Object obj) {
if (!(obj instanceof PerfTag)) {
return false;
}
PerfTag that = (PerfTag) obj;
return numericTag == that.numericTag
&& (stringTag == that.stringTag || (stringTag != null && stringTag.equals(that.stringTag)));
}
/**
* Provides methods that create Tag instances which should not be directly invoked by clients.
*
* <p>Calls to {@link PerfMark#create(long)}, {@link PerfMark#create(long, String)} and {@link
* PerfMark#create(String)} are replaced with calls to the methods in this class using bytecode
* rewriting, if enabled.
*/
static final class TagFactory {
/**
* This class should not be instantiated.
*/
private TagFactory() {
throw new AssertionError("nope");
}
public static PerfTag create(long numericTag, String stringTag) {
return new PerfTag(numericTag, stringTag);
}
public static PerfTag create(String stringTag) {
return new PerfTag(NULL_NUMERIC_TAG, stringTag);
}
public static PerfTag create(long numericTag) {
return new PerfTag(numericTag, NULL_STRING_TAG);
}
static PerfTag create() {
return new PerfTag(NULL_NUMERIC_TAG, NULL_STRING_TAG);
}
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This is an internal, experimental API and not subject to the normal compatibility guarantees.
*
* @see io.grpc.Internal
*/
@javax.annotation.CheckReturnValue
@javax.annotation.ParametersAreNonnullByDefault
package io.grpc.perfmark;

View File

@ -43,7 +43,6 @@ import io.grpc.ServerCall;
import io.grpc.Status;
import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl;
import io.grpc.internal.testing.SingleMessageProducer;
import io.perfmark.PerfMark;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
@ -91,7 +90,7 @@ public class ServerCallImplTest {
context = Context.ROOT.withCancellation();
call = new ServerCallImpl<>(stream, UNARY_METHOD, requestHeaders, context,
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
serverCallTracer, PerfMark.createTag());
serverCallTracer);
}
@Test
@ -114,7 +113,7 @@ public class ServerCallImplTest {
call = new ServerCallImpl<>(stream, UNARY_METHOD, requestHeaders, context,
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
tracer, PerfMark.createTag());
tracer);
// required boilerplate
call.sendHeaders(new Metadata());
@ -225,8 +224,7 @@ public class ServerCallImplTest {
context,
DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(),
serverCallTracer,
PerfMark.createTag());
serverCallTracer);
serverCall.sendHeaders(new Metadata());
serverCall.sendMessage(1L);
verify(stream, times(1)).writeMessage(any(InputStream.class));
@ -260,8 +258,7 @@ public class ServerCallImplTest {
context,
DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(),
serverCallTracer,
PerfMark.createTag());
serverCallTracer);
serverCall.sendHeaders(new Metadata());
serverCall.sendMessage(1L);
serverCall.sendMessage(1L);
@ -298,8 +295,7 @@ public class ServerCallImplTest {
context,
DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(),
serverCallTracer,
PerfMark.createTag());
serverCallTracer);
serverCall.close(Status.OK, new Metadata());
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(stream, times(1)).cancel(statusCaptor.capture());

View File

@ -77,7 +77,6 @@ import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener;
import io.grpc.internal.testing.SingleMessageProducer;
import io.grpc.internal.testing.TestServerStreamTracer;
import io.grpc.util.MutableHandlerRegistry;
import io.perfmark.PerfMark;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
@ -1141,8 +1140,7 @@ public class ServerImplTest {
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation(),
PerfMark.createTag());
Context.ROOT.withCancellation());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
@ -1167,8 +1165,7 @@ public class ServerImplTest {
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation(),
PerfMark.createTag());
Context.ROOT.withCancellation());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
@ -1193,8 +1190,7 @@ public class ServerImplTest {
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation(),
PerfMark.createTag());
Context.ROOT.withCancellation());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
@ -1217,8 +1213,7 @@ public class ServerImplTest {
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation(),
PerfMark.createTag());
Context.ROOT.withCancellation());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
@ -1241,8 +1236,7 @@ public class ServerImplTest {
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation(),
PerfMark.createTag());
Context.ROOT.withCancellation());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
@ -1265,8 +1259,7 @@ public class ServerImplTest {
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation(),
PerfMark.createTag());
Context.ROOT.withCancellation());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);

View File

@ -25,6 +25,5 @@ java_library(
"@io_netty_netty_handler_proxy//jar",
"@io_netty_netty_resolver//jar",
"@io_netty_netty_transport//jar",
"@io_perfmark_perfmark_api//jar",
],
)

View File

@ -43,7 +43,6 @@ import io.netty.channel.EventLoop;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.AsciiString;
import io.perfmark.PerfMark;
import javax.annotation.Nullable;
/**
@ -115,18 +114,8 @@ class NettyClientStream extends AbstractClientStream {
}
private class Sink implements AbstractClientStream.Sink {
@Override
public void writeHeaders(Metadata headers, byte[] requestPayload) {
PerfMark.startTask("NettyClientStream$Sink.writeHeaders");
try {
writeHeadersInternal(headers, requestPayload);
} finally {
PerfMark.stopTask("NettyClientStream$Sink.writeHeaders");
}
}
private void writeHeadersInternal(Metadata headers, byte[] requestPayload) {
// Convert the headers into Netty HTTP/2 headers.
AsciiString defaultPath = (AsciiString) methodDescriptorAccessor.geRawMethodName(method);
if (defaultPath == null) {
@ -163,13 +152,15 @@ class NettyClientStream extends AbstractClientStream {
}
}
};
// Write the command requesting the creation of the stream.
writeQueue.enqueue(
new CreateStreamCommand(http2Headers, transportState(), shouldBeCountedForInUse(), get),
!method.getType().clientSendsOneMessage() || get).addListener(failureListener);
}
private void writeFrameInternal(
@Override
public void writeFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) {
Preconditions.checkArgument(numMessages >= 0);
ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf();
@ -193,23 +184,12 @@ class NettyClientStream extends AbstractClientStream {
});
} else {
// The frame is empty and will not impact outbound flow control. Just send it.
writeQueue.enqueue(
new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush);
writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush);
}
}
@Override
public void writeFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
PerfMark.startTask("NettyClientStream$Sink.writeFrame");
try {
writeFrameInternal(frame, endOfStream, flush, numMessages);
} finally {
PerfMark.stopTask("NettyClientStream$Sink.writeFrame");
}
}
private void requestInternal(final int numMessages) {
public void request(final int numMessages) {
if (channel.eventLoop().inEventLoop()) {
// Processing data read in the event loop so can call into the deframer immediately
transportState().requestMessagesFromDeframer(numMessages);
@ -223,24 +203,9 @@ class NettyClientStream extends AbstractClientStream {
}
}
@Override
public void request(int numMessages) {
PerfMark.startTask("NettyClientStream$Sink.request");
try {
requestInternal(numMessages);
} finally {
PerfMark.stopTask("NettyClientStream$Sink.request");
}
}
@Override
public void cancel(Status status) {
PerfMark.startTask("NettyClientStream$Sink.cancel");
try {
writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true);
} finally {
PerfMark.stopTask("NettyClientStream$Sink.cancel");
}
writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true);
}
}

View File

@ -33,7 +33,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.perfmark.PerfMark;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -92,8 +91,8 @@ class NettyServerStream extends AbstractServerStream {
}
private class Sink implements AbstractServerStream.Sink {
private void requestInternal(final int numMessages) {
@Override
public void request(final int numMessages) {
if (channel.eventLoop().inEventLoop()) {
// Processing data read in the event loop so can call into the deframer immediately
transportState().requestMessagesFromDeframer(numMessages);
@ -108,30 +107,16 @@ class NettyServerStream extends AbstractServerStream {
}
@Override
public void request(final int numMessages) {
PerfMark.startTask("NettyServerStream$Sink.request");
try {
requestInternal(numMessages);
} finally {
PerfMark.stopTask("NettyServerStream$Sink.request");
}
public void writeHeaders(Metadata headers) {
writeQueue.enqueue(
SendResponseHeadersCommand.createHeaders(
transportState(),
Utils.convertServerHeaders(headers)),
true);
}
@Override
public void writeHeaders(Metadata headers) {
PerfMark.startTask("NettyServerStream$Sink.writeHeaders");
try {
writeQueue.enqueue(
SendResponseHeadersCommand.createHeaders(
transportState(),
Utils.convertServerHeaders(headers)),
true);
} finally {
PerfMark.stopTask("NettyServerStream$Sink.writeHeaders");
}
}
private void writeFrameInternal(WritableBuffer frame, boolean flush, final int numMessages) {
public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) {
Preconditions.checkArgument(numMessages >= 0);
if (frame == null) {
writeQueue.scheduleFlush();
@ -155,37 +140,17 @@ class NettyServerStream extends AbstractServerStream {
});
}
@Override
public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) {
PerfMark.startTask("NettyServerStream$Sink.writeFrame");
try {
writeFrameInternal(frame, flush, numMessages);
} finally {
PerfMark.stopTask("NettyServerStream$Sink.writeFrame");
}
}
@Override
public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
PerfMark.startTask("NettyServerStream$Sink.writeTrailers");
try {
Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent);
writeQueue.enqueue(
SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status),
true);
} finally {
PerfMark.stopTask("NettyServerStream$Sink.writeTrailers");
}
Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent);
writeQueue.enqueue(
SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status),
true);
}
@Override
public void cancel(Status status) {
PerfMark.startTask("NettyServerStream$Sink.cancel");
try {
writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true);
} finally {
PerfMark.startTask("NettyServerStream$Sink.cancel");
}
writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true);
}
}

View File

@ -21,8 +21,6 @@ import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.DefaultByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import io.perfmark.Link;
import io.perfmark.PerfMark;
/**
* Command sent from the transport to the Netty channel to send a GRPC frame to the remote endpoint.
@ -30,7 +28,6 @@ import io.perfmark.PerfMark;
final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQueue.QueuedCommand {
private final StreamIdHolder stream;
private final boolean endStream;
private final Link link;
private ChannelPromise promise;
@ -38,12 +35,6 @@ final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQu
super(content);
this.stream = stream;
this.endStream = endStream;
this.link = PerfMark.link();
}
@Override
public Link getLink() {
return link;
}
int streamId() {

View File

@ -22,8 +22,6 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.perfmark.Link;
import io.perfmark.PerfMark;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@ -106,44 +104,26 @@ class WriteQueue {
* called in the event loop
*/
private void flush() {
PerfMark.startTask("WriteQueue.periodicFlush");
try {
QueuedCommand cmd;
int i = 0;
boolean flushedOnce = false;
while ((cmd = queue.poll()) != null) {
PerfMark.startTask("WriteQueue.run");
try {
cmd.getLink().link();
cmd.run(channel);
} finally {
PerfMark.stopTask("WriteQueue.run");
}
cmd.run(channel);
if (++i == DEQUE_CHUNK_SIZE) {
i = 0;
// Flush each chunk so we are releasing buffers periodically. In theory this loop
// might never end as new events are continuously added to the queue, if we never
// flushed in that case we would be guaranteed to OOM.
PerfMark.startTask("WriteQueue.flush0");
try {
channel.flush();
} finally {
PerfMark.stopTask("WriteQueue.flush0");
}
channel.flush();
flushedOnce = true;
}
}
// Must flush at least once, even if there were no writes.
if (i != 0 || !flushedOnce) {
PerfMark.startTask("WriteQueue.flush1");
try {
channel.flush();
} finally {
PerfMark.stopTask("WriteQueue.flush1");
}
channel.flush();
}
} finally {
PerfMark.stopTask("WriteQueue.periodicFlush");
// Mark the write as done, if the queue is non-empty after marking trigger a new write.
scheduled.set(false);
if (!queue.isEmpty()) {
@ -154,10 +134,8 @@ class WriteQueue {
private static class RunnableCommand implements QueuedCommand {
private final Runnable runnable;
private final Link link;
public RunnableCommand(Runnable runnable) {
this.link = PerfMark.link();
this.runnable = runnable;
}
@ -175,21 +153,11 @@ class WriteQueue {
public final void run(Channel channel) {
runnable.run();
}
@Override
public Link getLink() {
return link;
}
}
abstract static class AbstractQueuedCommand implements QueuedCommand {
private ChannelPromise promise;
private final Link link;
AbstractQueuedCommand() {
this.link = PerfMark.link();
}
@Override
public final void promise(ChannelPromise promise) {
@ -205,11 +173,6 @@ class WriteQueue {
public final void run(Channel channel) {
channel.write(this, promise);
}
@Override
public Link getLink() {
return link;
}
}
/**
@ -227,7 +190,5 @@ class WriteQueue {
void promise(ChannelPromise promise);
void run(Channel channel);
Link getLink();
}
}

View File

@ -35,7 +35,6 @@ def grpc_java_repositories(
omit_io_netty_tcnative_boringssl_static = False,
omit_io_opencensus_api = False,
omit_io_opencensus_grpc_metrics = False,
omit_io_perfmark = False,
omit_javax_annotation = False,
omit_junit_junit = False,
omit_net_zlib = False,
@ -104,8 +103,6 @@ def grpc_java_repositories(
io_opencensus_api()
if not omit_io_opencensus_grpc_metrics:
io_opencensus_grpc_metrics()
if not omit_io_perfmark:
io_perfmark()
if not omit_javax_annotation:
javax_annotation()
if not omit_junit_junit:
@ -405,15 +402,6 @@ def io_opencensus_grpc_metrics():
licenses = ["notice"], # Apache 2.0
)
def io_perfmark():
jvm_maven_import_external(
name = "io_perfmark_perfmark_api",
artifact = "io.perfmark:perfmark-api:0.16.0",
server_urls = ["http://central.maven.org/maven2"],
artifact_sha256 = "a93667875ea9d10315177768739a18d6c667df041c982d2841645ae8558d0af0",
licenses = ["notice"], # Apache 2.0
)
def javax_annotation():
# Use //stub:javax_annotation for neverlink=1 support.
jvm_maven_import_external(