core, netty: add io.perfmark Annotations

This add perfmark annotations in some key places, notably on transport/application boundaries, and thread hop locations. Perfmark records to a thread-local buffer the events that happen in each thread. Perfmark is disabled by default, and will compile to a noop unless Perfmark.setEnabled is invoked. This should make it free when disable, and pretty fast when it is enabled.

It is important that started tasks are ended, so several places in our code are moved to either try-finally blocks, or moved into a private method. I realize this is ugly, but I think it is manageable. In the future, we can look at making an agent or compiler plugin that simplifies the recording.

Linking between threads is done with a Link object, which is created on the "outbound" task, and used on the "inbound" task. This is slightly more verbose, and does has a small amount of runtime overhead, even when disabled. (for null checks, slightly higher memory usage, etc.) I think this is okay to, because it makes other optimizations much easier.
This commit is contained in:
Carl Mastrangelo 2019-06-06 17:58:49 -07:00 committed by GitHub
parent 5d0c283b46
commit 71967622d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 428 additions and 615 deletions

View File

@ -201,6 +201,7 @@ subprojects {
opencensus_impl: "io.opencensus:opencensus-impl:${opencensusVersion}",
opencensus_impl_lite: "io.opencensus:opencensus-impl-lite:${opencensusVersion}",
instrumentation_api: 'com.google.instrumentation:instrumentation-api:0.4.3',
perfmark: 'io.perfmark:perfmark-api:0.16.0',
protobuf: "com.google.protobuf:protobuf-java:${protobufVersion}",
protobuf_lite: "com.google.protobuf:protobuf-lite:3.0.1",
protoc_lite: "com.google.protobuf:protoc-gen-javalite:3.0.0",

View File

@ -1,16 +1,3 @@
PERFMARK_INTERNAL_ACCESSOR_SRCS = glob(
[
"src/main/java/io/grpc/perfmark/Internal*.java",
],
)
PERFMARK_SRCS = glob(
[
"src/main/java/io/grpc/perfmark/*.java",
],
exclude = PERFMARK_INTERNAL_ACCESSOR_SRCS,
)
java_library(
name = "core",
visibility = ["//visibility:public"],
@ -43,7 +30,6 @@ java_library(
]),
visibility = ["//:__subpackages__"],
deps = [
":perfmark",
"//api",
"//context",
"@com_google_android_annotations//jar",
@ -54,6 +40,7 @@ java_library(
"@com_google_j2objc_j2objc_annotations//jar",
"@io_opencensus_opencensus_api//jar",
"@io_opencensus_opencensus_contrib_grpc_metrics//jar",
"@io_perfmark_perfmark_api//jar",
"@org_codehaus_mojo_animal_sniffer_annotations//jar",
],
)
@ -76,12 +63,3 @@ java_library(
],
)
java_library(
name = "perfmark",
srcs = PERFMARK_SRCS,
visibility = ["//:__subpackages__"],
deps = [
"@com_google_code_findbugs_jsr305//jar",
"@com_google_errorprone_error_prone_annotations//jar",
],
)

View File

@ -4,6 +4,7 @@ dependencies {
compile project(':grpc-api'),
libraries.gson,
libraries.android_annotations,
libraries.perfmark
compile (libraries.opencensus_api) {
// prefer 3.0.2 from libraries instead of 3.0.1
exclude group: 'com.google.code.findbugs', module: 'jsr305'

View File

@ -46,8 +46,9 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.perfmark.PerfMark;
import io.grpc.perfmark.PerfTag;
import io.perfmark.Link;
import io.perfmark.PerfMark;
import io.perfmark.Tag;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.concurrent.CancellationException;
@ -69,7 +70,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
= "gzip".getBytes(Charset.forName("US-ASCII"));
private final MethodDescriptor<ReqT, RespT> method;
private final PerfTag tag;
private final Tag tag;
private final Executor callExecutor;
private final CallTracer channelCallsTracer;
private final Context context;
@ -96,7 +97,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
boolean retryEnabled) {
this.method = method;
// TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl.
this.tag = PerfMark.createTag(method.getFullMethodName());
this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this));
// If we know that the executor is a direct executor, we don't need to wrap it with a
// SerializingExecutor. This is purely for performance reasons.
// See https://github.com/grpc/grpc-java/issues/368
@ -112,6 +113,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
this.clientTransportProvider = clientTransportProvider;
this.deadlineCancellationExecutor = deadlineCancellationExecutor;
this.retryEnabled = retryEnabled;
PerfMark.event("ClientCall.<init>", tag);
}
private final class ContextCancellationListener implements CancellationListener {
@ -183,11 +185,11 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void start(Listener<RespT> observer, Metadata headers) {
PerfMark.taskStart(tag, "ClientCall.start");
PerfMark.startTask("ClientCall.start", tag);
try {
startInternal(observer, headers);
} finally {
PerfMark.taskEnd(tag, "ClientCall.start");
PerfMark.stopTask("ClientCall.start", tag);
}
}
@ -378,18 +380,23 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void request(int numMessages) {
checkState(stream != null, "Not started");
checkArgument(numMessages >= 0, "Number requested must be non-negative");
stream.request(numMessages);
PerfMark.startTask("ClientCall.request", tag);
try {
checkState(stream != null, "Not started");
checkArgument(numMessages >= 0, "Number requested must be non-negative");
stream.request(numMessages);
} finally {
PerfMark.stopTask("ClientCall.cancel", tag);
}
}
@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
PerfMark.taskStart(tag, "ClientCall.cancel");
PerfMark.startTask("ClientCall.cancel", tag);
try {
cancelInternal(message, cause);
} finally {
PerfMark.taskEnd(tag, "ClientCall.cancel");
PerfMark.stopTask("ClientCall.cancel", tag);
}
}
@ -424,11 +431,11 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void halfClose() {
PerfMark.taskStart(tag, "ClientCall.halfClose");
PerfMark.startTask("ClientCall.halfClose", tag);
try {
halfCloseInternal();
} finally {
PerfMark.taskEnd(tag, "ClientCall.halfClose");
PerfMark.stopTask("ClientCall.halfClose", tag);
}
}
@ -442,11 +449,11 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void sendMessage(ReqT message) {
PerfMark.taskStart(tag, "ClientCall.sendMessage");
PerfMark.startTask("ClientCall.sendMessage", tag);
try {
sendMessageInternal(message);
} finally {
PerfMark.taskEnd(tag, "ClientCall.sendMessage");
PerfMark.stopTask("ClientCall.sendMessage", tag);
}
}
@ -515,17 +522,29 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void headersRead(final Metadata headers) {
PerfMark.startTask("ClientStreamListener.headersRead", tag);
final Link link = PerfMark.link();
final class HeadersRead extends ContextRunnable {
HeadersRead() {
super(context);
}
@Override
public final void runInContext() {
public void runInContext() {
PerfMark.startTask("ClientCall$Listener.headersRead", tag);
link.link();
try {
runInternal();
} finally {
PerfMark.stopTask("ClientCall$Listener.headersRead", tag);
}
}
private void runInternal() {
if (closed) {
return;
}
PerfMark.taskStart(tag, "ClientCall.headersRead");
try {
observer.onHeaders(headers);
} catch (Throwable t) {
@ -533,29 +552,43 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
Status.CANCELLED.withCause(t).withDescription("Failed to read headers");
stream.cancel(status);
close(status, new Metadata());
} finally {
PerfMark.taskEnd(tag, "ClientCall.headersRead");
}
}
}
callExecutor.execute(new HeadersRead());
try {
callExecutor.execute(new HeadersRead());
} finally {
PerfMark.stopTask("ClientStreamListener.headersRead", tag);
}
}
@Override
public void messagesAvailable(final MessageProducer producer) {
PerfMark.startTask("ClientStreamListener.messagesAvailable", tag);
final Link link = PerfMark.link();
final class MessagesAvailable extends ContextRunnable {
MessagesAvailable() {
super(context);
}
@Override
public final void runInContext() {
public void runInContext() {
PerfMark.startTask("ClientCall$Listener.messagesAvailable", tag);
link.link();
try {
runInternal();
} finally {
PerfMark.stopTask("ClientCall$Listener.messagesAvailable", tag);
}
}
private void runInternal() {
if (closed) {
GrpcUtil.closeQuietly(producer);
return;
}
PerfMark.taskStart(tag, "ClientCall.messagesAvailable");
try {
InputStream message;
while ((message = producer.next()) != null) {
@ -573,13 +606,15 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
Status.CANCELLED.withCause(t).withDescription("Failed to read message.");
stream.cancel(status);
close(status, new Metadata());
} finally {
PerfMark.taskEnd(tag, "ClientCall.messagesAvailable");
}
}
}
callExecutor.execute(new MessagesAvailable());
try {
callExecutor.execute(new MessagesAvailable());
} finally {
PerfMark.stopTask("ClientStreamListener.messagesAvailable", tag);
}
}
/**
@ -603,6 +638,16 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
PerfMark.startTask("ClientStreamListener.closed", tag);
try {
closedInternal(status, rpcProgress, trailers);
} finally {
PerfMark.stopTask("ClientStreamListener.closed", tag);
}
}
private void closedInternal(
Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers) {
Deadline deadline = effectiveDeadline();
if (status.getCode() == Status.Code.CANCELLED && deadline != null) {
// When the server's deadline expires, it can only reset the stream with CANCEL and no
@ -616,23 +661,29 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
}
final Status savedStatus = status;
final Metadata savedTrailers = trailers;
final Link link = PerfMark.link();
final class StreamClosed extends ContextRunnable {
StreamClosed() {
super(context);
}
@Override
public final void runInContext() {
public void runInContext() {
PerfMark.startTask("ClientCall$Listener.onClose", tag);
link.link();
try {
runInternal();
} finally {
PerfMark.stopTask("ClientCall$Listener.onClose", tag);
}
}
private void runInternal() {
if (closed) {
// We intentionally don't keep the status or metadata from the server.
return;
}
PerfMark.taskStart(tag, "ClientCall.closed");
try {
close(savedStatus, savedTrailers);
} finally {
PerfMark.taskEnd(tag, "ClientCall.closed");
}
close(savedStatus, savedTrailers);
}
}
@ -641,14 +692,26 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void onReady() {
PerfMark.startTask("ClientStreamListener.onReady", tag);
final Link link = PerfMark.link();
final class StreamOnReady extends ContextRunnable {
StreamOnReady() {
super(context);
}
@Override
public final void runInContext() {
PerfMark.taskStart(tag, "ClientCall.onReady");
public void runInContext() {
PerfMark.startTask("ClientCall$Listener.onReady", tag);
link.link();
try {
runInternal();
} finally {
PerfMark.stopTask("ClientCall$Listener.onReady", tag);
}
}
private void runInternal() {
try {
observer.onReady();
} catch (Throwable t) {
@ -656,13 +719,15 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
Status.CANCELLED.withCause(t).withDescription("Failed to call onReady.");
stream.cancel(status);
close(status, new Metadata());
} finally {
PerfMark.taskEnd(tag, "ClientCall.onReady");
}
}
}
callExecutor.execute(new StreamOnReady());
try {
callExecutor.execute(new StreamOnReady());
} finally {
PerfMark.stopTask("ClientStreamListener.onReady", tag);
}
}
}
}

View File

@ -37,8 +37,8 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.Status;
import io.grpc.perfmark.PerfMark;
import io.grpc.perfmark.PerfTag;
import io.perfmark.PerfMark;
import io.perfmark.Tag;
import java.io.InputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -54,7 +54,7 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
private final ServerStream stream;
private final MethodDescriptor<ReqT, RespT> method;
private final PerfTag tag;
private final Tag tag;
private final Context.CancellableContext context;
private final byte[] messageAcceptEncoding;
private final DecompressorRegistry decompressorRegistry;
@ -71,31 +71,35 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method,
Metadata inboundHeaders, Context.CancellableContext context,
DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
CallTracer serverCallTracer) {
CallTracer serverCallTracer, Tag tag) {
this.stream = stream;
this.method = method;
// TODO(carl-mastrangelo): consider moving this to the ServerImpl to record startCall.
this.tag = PerfMark.createTag(method.getFullMethodName());
this.context = context;
this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY);
this.decompressorRegistry = decompressorRegistry;
this.compressorRegistry = compressorRegistry;
this.serverCallTracer = serverCallTracer;
this.serverCallTracer.reportCallStarted();
this.tag = tag;
}
@Override
public void request(int numMessages) {
stream.request(numMessages);
PerfMark.startTask("ServerCall.request", tag);
try {
stream.request(numMessages);
} finally {
PerfMark.stopTask("ServerCall.request", tag);
}
}
@Override
public void sendHeaders(Metadata headers) {
PerfMark.taskStart(tag, "ServerCall.sendHeaders");
PerfMark.startTask("ServerCall.sendHeaders", tag);
try {
sendHeadersInternal(headers);
} finally {
PerfMark.taskEnd(tag, "ServerCall.sendHeaders");
PerfMark.stopTask("ServerCall.sendHeaders", tag);
}
}
@ -140,11 +144,11 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@Override
public void sendMessage(RespT message) {
PerfMark.taskStart(tag, "ServerCall.sendMessage");
PerfMark.startTask("ServerCall.sendMessage", tag);
try {
sendMessageInternal(message);
} finally {
PerfMark.taskEnd(tag, "ServerCall.sendMessage");
PerfMark.stopTask("ServerCall.sendMessage", tag);
}
}
@ -193,11 +197,11 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@Override
public void close(Status status, Metadata trailers) {
PerfMark.taskStart(tag, "ServerCall.close");
PerfMark.startTask("ServerCall.close", tag);
try {
closeInternal(status, trailers);
} finally {
PerfMark.taskEnd(tag, "ServerCall.close");
PerfMark.stopTask("ServerCall.close", tag);
}
}
@ -281,15 +285,23 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
MoreExecutors.directExecutor());
}
@SuppressWarnings("Finally") // The code avoids suppressing the exception thrown from try
@Override
public void messagesAvailable(final MessageProducer producer) {
public void messagesAvailable(MessageProducer producer) {
PerfMark.startTask("ServerStreamListener.messagesAvailable", call.tag);
try {
messagesAvailableInternal(producer);
} finally {
PerfMark.stopTask("ServerStreamListener.messagesAvailable", call.tag);
}
}
@SuppressWarnings("Finally") // The code avoids suppressing the exception thrown from try
private void messagesAvailableInternal(final MessageProducer producer) {
if (call.cancelled) {
GrpcUtil.closeQuietly(producer);
return;
}
PerfMark.taskStart(call.tag, "ServerCall.messagesAvailable");
InputStream message;
try {
while ((message = producer.next()) != null) {
@ -305,58 +317,58 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
GrpcUtil.closeQuietly(producer);
Throwables.throwIfUnchecked(t);
throw new RuntimeException(t);
} finally {
PerfMark.taskEnd(call.tag, "ServerCall.messagesAvailable");
}
}
@Override
public void halfClosed() {
if (call.cancelled) {
return;
}
PerfMark.taskStart(call.tag, "ServerCall.halfClosed");
PerfMark.startTask("ServerStreamListener.halfClosed", call.tag);
try {
if (call.cancelled) {
return;
}
listener.onHalfClose();
} finally {
PerfMark.taskEnd(call.tag, "ServerCall.halfClosed");
PerfMark.stopTask("ServerStreamListener.halfClosed", call.tag);
}
}
@Override
public void closed(Status status) {
PerfMark.taskStart(call.tag, "ServerCall.closed");
PerfMark.startTask("ServerStreamListener.closed", call.tag);
try {
try {
if (status.isOk()) {
listener.onComplete();
} else {
call.cancelled = true;
listener.onCancel();
}
} finally {
// Cancel context after delivering RPC closure notification to allow the application to
// clean up and update any state based on whether onComplete or onCancel was called.
context.cancel(null);
closedInternal(status);
} finally {
PerfMark.stopTask("ServerStreamListener.closed", call.tag);
}
}
private void closedInternal(Status status) {
try {
if (status.isOk()) {
listener.onComplete();
} else {
call.cancelled = true;
listener.onCancel();
}
} finally {
PerfMark.taskEnd(call.tag, "ServerCall.closed");
// Cancel context after delivering RPC closure notification to allow the application to
// clean up and update any state based on whether onComplete or onCancel was called.
context.cancel(null);
}
}
@Override
public void onReady() {
if (call.cancelled) {
return;
}
PerfMark.taskStart(call.tag, "ServerCall.closed");
PerfMark.startTask("ServerStreamListener.onReady", call.tag);
try {
if (call.cancelled) {
return;
}
listener.onReady();
} finally {
PerfMark.taskEnd(call.tag, "ServerCall.closed");
PerfMark.stopTask("ServerCall.closed", call.tag);
}
}
}

View File

@ -50,6 +50,9 @@ import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerTransportFilter;
import io.grpc.Status;
import io.perfmark.Link;
import io.perfmark.PerfMark;
import io.perfmark.Tag;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
@ -460,9 +463,21 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
transportClosed(transport);
}
@Override
public void streamCreated(
final ServerStream stream, final String methodName, final Metadata headers) {
public void streamCreated(ServerStream stream, String methodName, Metadata headers) {
Tag tag = PerfMark.createTag(methodName, stream.hashCode());
PerfMark.startTask("ServerTransportListener.streamCreated", tag);
try {
streamCreatedInternal(stream, methodName, headers, tag);
} finally {
PerfMark.stopTask("ServerTransportListener.streamCreated", tag);
}
}
private void streamCreatedInternal(
final ServerStream stream, final String methodName, final Metadata headers, final Tag tag) {
if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
String encoding = headers.get(MESSAGE_ENCODING_KEY);
Decompressor decompressor = decompressorRegistry.lookupDecompressor(encoding);
@ -489,22 +504,33 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
wrappedExecutor = new SerializingExecutor(executor);
}
final Link link = PerfMark.link();
final JumpToApplicationThreadServerStreamListener jumpListener
= new JumpToApplicationThreadServerStreamListener(
wrappedExecutor, executor, stream, context);
wrappedExecutor, executor, stream, context, tag);
stream.setListener(jumpListener);
// Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks
// are delivered, including any errors. Callbacks can still be triggered, but they will be
// queued.
final class StreamCreated extends ContextRunnable {
StreamCreated() {
super(context);
}
@Override
public void runInContext() {
PerfMark.startTask("ServerTransportListener$StreamCreated.startCall", tag);
link.link();
try {
runInternal();
} finally {
PerfMark.stopTask("ServerTransportListener$StreamCreated.startCall", tag);
}
}
private void runInternal() {
ServerStreamListener listener = NOOP_LISTENER;
try {
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
@ -523,7 +549,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
context.cancel(null);
return;
}
listener = startCall(stream, methodName, method, headers, context, statsTraceCtx);
listener =
startCall(stream, methodName, method, headers, context, statsTraceCtx, tag);
} catch (RuntimeException e) {
stream.close(Status.fromThrowable(e), new Metadata());
context.cancel(null);
@ -573,7 +600,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
/** Never returns {@code null}. */
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers,
Context.CancellableContext context, StatsTraceContext statsTraceCtx) {
Context.CancellableContext context, StatsTraceContext statsTraceCtx, Tag tag) {
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
statsTraceCtx.serverCallStarted(
new ServerCallInfoImpl<>(
@ -587,7 +614,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
ServerMethodDefinition<?, ?> wMethodDef = binlog == null
? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);
return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context);
return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context, tag);
}
private <WReqT, WRespT> ServerStreamListener startWrappedCall(
@ -595,7 +622,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
ServerMethodDefinition<WReqT, WRespT> methodDef,
ServerStream stream,
Metadata headers,
Context.CancellableContext context) {
Context.CancellableContext context,
Tag tag) {
ServerCallImpl<WReqT, WRespT> call = new ServerCallImpl<>(
stream,
@ -604,7 +632,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
context,
decompressorRegistry,
compressorRegistry,
serverCallTracer);
serverCallTracer,
tag);
ServerCall.Listener<WReqT> listener =
methodDef.getServerCallHandler().startCall(call, headers);
@ -687,15 +716,17 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
private final Executor cancelExecutor;
private final Context.CancellableContext context;
private final ServerStream stream;
private final Tag tag;
// Only accessed from callExecutor.
private ServerStreamListener listener;
public JumpToApplicationThreadServerStreamListener(Executor executor,
Executor cancelExecutor, ServerStream stream, Context.CancellableContext context) {
Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) {
this.callExecutor = executor;
this.cancelExecutor = cancelExecutor;
this.stream = stream;
this.context = context;
this.tag = tag;
}
/**
@ -725,6 +756,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override
public void messagesAvailable(final MessageProducer producer) {
PerfMark.startTask("ServerStreamListener.messagesAvailable", tag);
final Link link = PerfMark.link();
final class MessagesAvailable extends ContextRunnable {
@ -734,6 +767,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override
public void runInContext() {
PerfMark.startTask("ServerCallListener(app).messagesAvailable", tag);
link.link();
try {
getListener().messagesAvailable(producer);
} catch (RuntimeException e) {
@ -742,15 +777,24 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
} catch (Error e) {
internalClose();
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).messagesAvailable", tag);
}
}
}
callExecutor.execute(new MessagesAvailable());
try {
callExecutor.execute(new MessagesAvailable());
} finally {
PerfMark.stopTask("ServerStreamListener.messagesAvailable", tag);
}
}
@Override
public void halfClosed() {
PerfMark.startTask("ServerStreamListener.halfClosed", tag);
final Link link = PerfMark.link();
final class HalfClosed extends ContextRunnable {
HalfClosed() {
super(context);
@ -758,6 +802,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override
public void runInContext() {
PerfMark.startTask("ServerCallListener(app).halfClosed", tag);
link.link();
try {
getListener().halfClosed();
} catch (RuntimeException e) {
@ -766,15 +812,30 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
} catch (Error e) {
internalClose();
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).halfClosed", tag);
}
}
}
callExecutor.execute(new HalfClosed());
try {
callExecutor.execute(new HalfClosed());
} finally {
PerfMark.stopTask("ServerStreamListener.halfClosed", tag);
}
}
@Override
public void closed(final Status status) {
PerfMark.startTask("ServerStreamListener.closed", tag);
try {
closedInternal(status);
} finally {
PerfMark.stopTask("ServerStreamListener.closed", tag);
}
}
private void closedInternal(final Status status) {
// For cancellations, promptly inform any users of the context that their work should be
// aborted. Otherwise, we can wait until pending work is done.
if (!status.isOk()) {
@ -782,6 +843,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
// is not serializing.
cancelExecutor.execute(new ContextCloser(context, status.getCause()));
}
final Link link = PerfMark.link();
final class Closed extends ContextRunnable {
Closed() {
@ -790,7 +852,13 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override
public void runInContext() {
getListener().closed(status);
PerfMark.startTask("ServerCallListener(app).closed", tag);
link.link();
try {
getListener().closed(status);
} finally {
PerfMark.stopTask("ServerCallListener(app).closed", tag);
}
}
}
@ -799,6 +867,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override
public void onReady() {
PerfMark.startTask("ServerStreamListener.onReady", tag);
final Link link = PerfMark.link();
final class OnReady extends ContextRunnable {
OnReady() {
super(context);
@ -806,6 +876,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override
public void runInContext() {
PerfMark.startTask("ServerCallListener(app).onReady", tag);
link.link();
try {
getListener().onReady();
} catch (RuntimeException e) {
@ -814,11 +886,17 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
} catch (Error e) {
internalClose();
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).onReady", tag);
}
}
}
callExecutor.execute(new OnReady());
try {
callExecutor.execute(new OnReady());
} finally {
PerfMark.stopTask("ServerStreamListener.onReady", tag);
}
}
}

View File

@ -1,49 +0,0 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.perfmark;
/**
* Internal {@link PerfTag.TagFactory} and {@link PerfMarkTask} accessor. This is intended for use
* by io.grpc.perfmark, and the specifically supported packages that utilize PerfMark. If you
* *really* think you need to use this, contact the gRPC team first.
*/
public final class InternalPerfMark {
private InternalPerfMark() {}
/** Expose class to allow packages that utilize PerfMark to get PerfMarkTask instances. */
public abstract static class InternalPerfMarkTask extends PerfMarkTask {
public InternalPerfMarkTask() {}
}
/** Expose methods that create PerfTag to packages that utilize PerfMark. */
private static final long NULL_NUMERIC_TAG = 0;
private static final String NULL_STRING_TAG = null;
public static PerfTag createPerfTag(long numericTag, String stringTag) {
return PerfTag.TagFactory.create(numericTag, stringTag);
}
public static PerfTag createPerfTag(String stringTag) {
return PerfTag.TagFactory.create(NULL_NUMERIC_TAG, stringTag);
}
public static PerfTag createPerfTag(long numericTag) {
return PerfTag.TagFactory.create(numericTag, NULL_STRING_TAG);
}
}

View File

@ -1,171 +0,0 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.perfmark;
import com.google.errorprone.annotations.CompileTimeConstant;
import io.grpc.perfmark.PerfTag.TagFactory;
/**
* PerfMark is a collection of stub methods for marking key points in the RPC lifecycle. This
* class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this yet.
*/
public final class PerfMark {
private PerfMark() {
throw new AssertionError("nope");
}
/**
* Start a Task with a Tag to identify it; a task represents some work that spans some time, and
* you are interested in both its start time and end time.
*
* @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't
* use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task
* does not have a numeric tag associated. In this case, you are encouraged to use {@link
* #taskStart(String)} or {@link PerfTag#create(String)}.
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
* Otherwise, instrumentation result will show "(invalid name)" for this task.
*/
public static void taskStart(PerfTag tag, @CompileTimeConstant String taskName) {}
/**
* Start a Task; a task represents some work that spans some time, and you are interested in both
* its start time and end time.
*
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
* Otherwise, instrumentation result will show "(invalid name)" for this task.
*/
public static void taskStart(@CompileTimeConstant String taskName) {}
/**
* End a Task with a Tag to identify it; a task represents some work that spans some time, and
* you are interested in both its start time and end time.
*
* @param tag a Tag object associated with the task start. This should be the tag used for the
* corresponding {@link #taskStart(PerfTag, String)} call.
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
* Otherwise, instrumentation result will show "(invalid name)" for this task. This should
* be the name used by the corresponding {@link #taskStart(PerfTag, String)} call.
*/
public static void taskEnd(PerfTag tag, @CompileTimeConstant String taskName) {}
/**
* End a Task with a Tag to identify it; a task represents some work that spans some time, and
* you are interested in both its start time and end time.
*
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
* Otherwise, instrumentation result will show "(invalid name)" for this task. This should
* be the name used by the corresponding {@link #taskStart(String)} call.
*/
public static void taskEnd(@CompileTimeConstant String taskName) {}
/**
* Start a Task with a Tag to identify it in a try-with-resource statement; a task represents some
* work that spans some time, and you are interested in both its start time and end time.
*
* <p>Use this in a try-with-resource statement so that task will end automatically.
*
* @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't
* use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task
* does not have a numeric tag associated. In this case, you are encouraged to use {@link
* #task(String)} or {@link PerfTag#create(String)}.
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
* Otherwise, instrumentation result will show "(invalid name)" for this task.
*/
public static PerfMarkTask task(PerfTag tag, @CompileTimeConstant String taskName) {
return NoopTask.INSTANCE;
}
/**
* Start a Task it in a try-with-resource statement; a task represents some work that spans some
* time, and you are interested in both its start time and end time.
*
* <p>Use this in a try-with-resource statement so that task will end automatically.
*
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
* Otherwise, instrumentation result will show "(invalid name)" for this task.
*/
public static PerfMarkTask task(@CompileTimeConstant String taskName) {
return NoopTask.INSTANCE;
}
/**
* Records an Event with a Tag to identify it.
*
* <p>An Event is different from a Task in that you don't care how much time it spanned. You are
* interested in only the time it happened.
*
* @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't
* use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task
* does not have a numeric tag associated. In this case, you are encouraged to use {@link
* #event(String)} or {@link PerfTag#create(String)}.
* @param eventName The name of the event. <b>This parameter must be a compile-time constant!</b>
* Otherwise, instrumentation result will show "(invalid name)" for this event.
*/
public static void event(PerfTag tag, @CompileTimeConstant String eventName) {}
/**
* Records an Event.
*
* <p>An Event is different from a Task in that you don't care how much time it spanned. You are
* interested in only the time it happened.
*
* @param eventName The name of the event. <b>This parameter must be a compile-time constant!</b>
* Otherwise, instrumentation result will show "(invalid name)" for this event.
*/
public static void event(@CompileTimeConstant String eventName) {}
/**
* If PerfMark instrumentation is not enabled, returns a Tag with numericTag = 0L. Replacement
* for {@link TagFactory#create(long, String)} if PerfMark agent is enabled.
*
*/
public static PerfTag createTag(
@SuppressWarnings("unused") long numericTag, @SuppressWarnings("unused") String stringTag) {
// Warning suppression is safe as this method returns by default the NULL_PERF_TAG
return NULL_PERF_TAG;
}
/**
* If PerfMark instrumentation is not enabled returns a Tag with numericTag = 0L. Replacement
* for {@link TagFactory#create(String)} if PerfMark agent is enabled.
*/
public static PerfTag createTag(@SuppressWarnings("unused") String stringTag) {
// Warning suppression is safe as this method returns by default the NULL_PERF_TAG
return NULL_PERF_TAG;
}
/**
* If PerfMark instrumentation is not enabled returns a Tag with numericTag = 0L. Replacement
* for {@link TagFactory#create(long)} if PerfMark agent is enabled.
*/
public static PerfTag createTag(@SuppressWarnings("unused") long numericTag) {
// Warning suppression is safe as this method returns by default the NULL_PERF_TAG
return NULL_PERF_TAG;
}
private static final PerfTag NULL_PERF_TAG = TagFactory.create();
private static final class NoopTask extends PerfMarkTask {
private static final PerfMarkTask INSTANCE = new NoopTask();
NoopTask() {}
@Override
public void close() {}
}
}

View File

@ -1,31 +0,0 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.perfmark;
import java.io.Closeable;
/**
* This class exists to make it easier for users to use the try-with-resource shorthand for
* starting and ending a PerfMark Task. This class is {@link io.grpc.Internal} and
* {@link io.grpc.ExperimentalApi}. Do not use this yet.
*/
public abstract class PerfMarkTask implements Closeable {
@Override
public abstract void close();
PerfMarkTask() {}
}

View File

@ -1,71 +0,0 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.perfmark;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Annotation to add PerfMark instrumentation points surrounding method invocation.
*
* <p>This class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this
* yet.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
// TODO(carl-mastrangelo): Add this line back in and make it okay on Android
//@IncompatibleModifiers(value = {Modifier.ABSTRACT, Modifier.NATIVE})
public @interface PerfMarker {
/**
* The name of the task; e.g. `parseHeaders`.
*/
String taskName();
/**
* An optional computed tag.
*
* <p>There are 3 supported references that can be used
* <ul>
* <li>{@code "this"}: Then the tag will be the {@link Object#toString} of the current class.
* Only valid for instance methods.
* <li>{@code "someFieldName"}: Then the tag will be the result of
* calling {@link String#valueOf(Object)} on the field. The field cannot be a primitive or
* and array type. (Though we may revisit this in the future).
* <li>{@code "$N"}: Then the tag will be the result of calling {@link String#valueOf(Object)}
* on the Nth method parameter. Parameters are {@code 0} indexed so {@code "$1"} is the
* second parameter. The referenced parameter cannot be a primitive or an array type.
* (Though we may revisit this in the future).
* </ul>
*
* <p>In general you should reference either {@code "this"} or {@code final} fields since
* in these cases we can cache the operations to decrease the cost of computing the tags. A side
* effect of this is that for such references we will not have their tags recalculated after the
* first time. Thus it is best to use immutable objects for tags.
*/
String computedTag() default "";
/**
* True if class with annotation is immutable and instrumentation must adhere to this restriction.
* If enableSampling is passed as argument to the agent, instrumentation points with <code>
* immutable = true </code> are ignored.
*/
boolean immutable() default false;
}

View File

@ -1,118 +0,0 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.perfmark;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
/**
* A Tag is used to provide additional information to identify a task and consists of a 64-bit
* integer value and a string.
*
* <p>Both the {@code numericTag} and the {@code stringTag} are optional. The {@code numericTag}
* value can be used to identify the specific task being worked on (e.g. the id of the rpc call).
* The {@code stringTag} can be used to store any value that is not a compile-time constant (a
* restriction imposed for the name passed to PerfMark tasks and events). A value of 0 for the
* {@code numericTag} is considered null. Don't use 0 for the {@code numericTag} unless you intend
* to specify null. In that case you are encouraged to use {@link #create(String)}.
*
* <p>Invocations to {@code create} methods in this class are a no-op unless PerfMark
* instrumentation is enabled. If so, calls to {@code create} methods to this class are replaced for
* calls to {@link TagFactory} create methods.
*
* <p>This class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this
* yet.
*/
@Immutable
public final class PerfTag {
private static final long NULL_NUMERIC_TAG = 0;
private static final String NULL_STRING_TAG = null;
private final long numericTag;
private final String stringTag;
private PerfTag(long numericTag, @Nullable String stringTag) {
this.numericTag = numericTag;
this.stringTag = stringTag;
}
/** Returns the numeric tag if set, or {@link #NULL_NUMERIC_TAG} instead. */
public long getNumericTag() {
return numericTag;
}
/** Returns the string tag if set, or {@link #NULL_STRING_TAG} instead. */
@Nullable public String getStringTag() {
return stringTag;
}
@Override
public String toString() {
return "Tag(numericTag=" + numericTag + ",stringTag='" + stringTag + "')";
}
@Override
public int hashCode() {
int longHashCode = (int)(numericTag ^ (numericTag >>> 32));
return longHashCode + (stringTag != null ? stringTag.hashCode() : 31);
}
@Override
@SuppressWarnings("ReferenceEquality") // No Java 8 yet.
public boolean equals(Object obj) {
if (!(obj instanceof PerfTag)) {
return false;
}
PerfTag that = (PerfTag) obj;
return numericTag == that.numericTag
&& (stringTag == that.stringTag || (stringTag != null && stringTag.equals(that.stringTag)));
}
/**
* Provides methods that create Tag instances which should not be directly invoked by clients.
*
* <p>Calls to {@link PerfMark#create(long)}, {@link PerfMark#create(long, String)} and {@link
* PerfMark#create(String)} are replaced with calls to the methods in this class using bytecode
* rewriting, if enabled.
*/
static final class TagFactory {
/**
* This class should not be instantiated.
*/
private TagFactory() {
throw new AssertionError("nope");
}
public static PerfTag create(long numericTag, String stringTag) {
return new PerfTag(numericTag, stringTag);
}
public static PerfTag create(String stringTag) {
return new PerfTag(NULL_NUMERIC_TAG, stringTag);
}
public static PerfTag create(long numericTag) {
return new PerfTag(numericTag, NULL_STRING_TAG);
}
static PerfTag create() {
return new PerfTag(NULL_NUMERIC_TAG, NULL_STRING_TAG);
}
}
}

View File

@ -1,24 +0,0 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This is an internal, experimental API and not subject to the normal compatibility guarantees.
*
* @see io.grpc.Internal
*/
@javax.annotation.CheckReturnValue
@javax.annotation.ParametersAreNonnullByDefault
package io.grpc.perfmark;

View File

@ -43,6 +43,7 @@ import io.grpc.ServerCall;
import io.grpc.Status;
import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl;
import io.grpc.internal.testing.SingleMessageProducer;
import io.perfmark.PerfMark;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
@ -90,7 +91,7 @@ public class ServerCallImplTest {
context = Context.ROOT.withCancellation();
call = new ServerCallImpl<>(stream, UNARY_METHOD, requestHeaders, context,
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
serverCallTracer);
serverCallTracer, PerfMark.createTag());
}
@Test
@ -113,7 +114,7 @@ public class ServerCallImplTest {
call = new ServerCallImpl<>(stream, UNARY_METHOD, requestHeaders, context,
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
tracer);
tracer, PerfMark.createTag());
// required boilerplate
call.sendHeaders(new Metadata());
@ -224,7 +225,8 @@ public class ServerCallImplTest {
context,
DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(),
serverCallTracer);
serverCallTracer,
PerfMark.createTag());
serverCall.sendHeaders(new Metadata());
serverCall.sendMessage(1L);
verify(stream, times(1)).writeMessage(any(InputStream.class));
@ -258,7 +260,8 @@ public class ServerCallImplTest {
context,
DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(),
serverCallTracer);
serverCallTracer,
PerfMark.createTag());
serverCall.sendHeaders(new Metadata());
serverCall.sendMessage(1L);
serverCall.sendMessage(1L);
@ -295,7 +298,8 @@ public class ServerCallImplTest {
context,
DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(),
serverCallTracer);
serverCallTracer,
PerfMark.createTag());
serverCall.close(Status.OK, new Metadata());
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(stream, times(1)).cancel(statusCaptor.capture());

View File

@ -77,6 +77,7 @@ import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener;
import io.grpc.internal.testing.SingleMessageProducer;
import io.grpc.internal.testing.TestServerStreamTracer;
import io.grpc.util.MutableHandlerRegistry;
import io.perfmark.PerfMark;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
@ -1140,7 +1141,8 @@ public class ServerImplTest {
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation());
Context.ROOT.withCancellation(),
PerfMark.createTag());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
@ -1165,7 +1167,8 @@ public class ServerImplTest {
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation());
Context.ROOT.withCancellation(),
PerfMark.createTag());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
@ -1190,7 +1193,8 @@ public class ServerImplTest {
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation());
Context.ROOT.withCancellation(),
PerfMark.createTag());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
@ -1213,7 +1217,8 @@ public class ServerImplTest {
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation());
Context.ROOT.withCancellation(),
PerfMark.createTag());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
@ -1236,7 +1241,8 @@ public class ServerImplTest {
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation());
Context.ROOT.withCancellation(),
PerfMark.createTag());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
@ -1259,7 +1265,8 @@ public class ServerImplTest {
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation());
Context.ROOT.withCancellation(),
PerfMark.createTag());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);

View File

@ -25,5 +25,6 @@ java_library(
"@io_netty_netty_handler_proxy//jar",
"@io_netty_netty_resolver//jar",
"@io_netty_netty_transport//jar",
"@io_perfmark_perfmark_api//jar",
],
)

View File

@ -43,6 +43,7 @@ import io.netty.channel.EventLoop;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.AsciiString;
import io.perfmark.PerfMark;
import javax.annotation.Nullable;
/**
@ -114,8 +115,18 @@ class NettyClientStream extends AbstractClientStream {
}
private class Sink implements AbstractClientStream.Sink {
@Override
public void writeHeaders(Metadata headers, byte[] requestPayload) {
PerfMark.startTask("NettyClientStream$Sink.writeHeaders");
try {
writeHeadersInternal(headers, requestPayload);
} finally {
PerfMark.stopTask("NettyClientStream$Sink.writeHeaders");
}
}
private void writeHeadersInternal(Metadata headers, byte[] requestPayload) {
// Convert the headers into Netty HTTP/2 headers.
AsciiString defaultPath = (AsciiString) methodDescriptorAccessor.geRawMethodName(method);
if (defaultPath == null) {
@ -152,15 +163,13 @@ class NettyClientStream extends AbstractClientStream {
}
}
};
// Write the command requesting the creation of the stream.
writeQueue.enqueue(
new CreateStreamCommand(http2Headers, transportState(), shouldBeCountedForInUse(), get),
!method.getType().clientSendsOneMessage() || get).addListener(failureListener);
}
@Override
public void writeFrame(
private void writeFrameInternal(
WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) {
Preconditions.checkArgument(numMessages >= 0);
ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf();
@ -184,12 +193,23 @@ class NettyClientStream extends AbstractClientStream {
});
} else {
// The frame is empty and will not impact outbound flow control. Just send it.
writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush);
writeQueue.enqueue(
new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush);
}
}
@Override
public void request(final int numMessages) {
public void writeFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
PerfMark.startTask("NettyClientStream$Sink.writeFrame");
try {
writeFrameInternal(frame, endOfStream, flush, numMessages);
} finally {
PerfMark.stopTask("NettyClientStream$Sink.writeFrame");
}
}
private void requestInternal(final int numMessages) {
if (channel.eventLoop().inEventLoop()) {
// Processing data read in the event loop so can call into the deframer immediately
transportState().requestMessagesFromDeframer(numMessages);
@ -203,9 +223,24 @@ class NettyClientStream extends AbstractClientStream {
}
}
@Override
public void request(int numMessages) {
PerfMark.startTask("NettyClientStream$Sink.request");
try {
requestInternal(numMessages);
} finally {
PerfMark.stopTask("NettyClientStream$Sink.request");
}
}
@Override
public void cancel(Status status) {
writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true);
PerfMark.startTask("NettyClientStream$Sink.cancel");
try {
writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true);
} finally {
PerfMark.stopTask("NettyClientStream$Sink.cancel");
}
}
}

View File

@ -33,6 +33,7 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.perfmark.PerfMark;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -91,8 +92,8 @@ class NettyServerStream extends AbstractServerStream {
}
private class Sink implements AbstractServerStream.Sink {
@Override
public void request(final int numMessages) {
private void requestInternal(final int numMessages) {
if (channel.eventLoop().inEventLoop()) {
// Processing data read in the event loop so can call into the deframer immediately
transportState().requestMessagesFromDeframer(numMessages);
@ -107,16 +108,30 @@ class NettyServerStream extends AbstractServerStream {
}
@Override
public void writeHeaders(Metadata headers) {
writeQueue.enqueue(
SendResponseHeadersCommand.createHeaders(
transportState(),
Utils.convertServerHeaders(headers)),
true);
public void request(final int numMessages) {
PerfMark.startTask("NettyServerStream$Sink.request");
try {
requestInternal(numMessages);
} finally {
PerfMark.stopTask("NettyServerStream$Sink.request");
}
}
@Override
public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) {
public void writeHeaders(Metadata headers) {
PerfMark.startTask("NettyServerStream$Sink.writeHeaders");
try {
writeQueue.enqueue(
SendResponseHeadersCommand.createHeaders(
transportState(),
Utils.convertServerHeaders(headers)),
true);
} finally {
PerfMark.stopTask("NettyServerStream$Sink.writeHeaders");
}
}
private void writeFrameInternal(WritableBuffer frame, boolean flush, final int numMessages) {
Preconditions.checkArgument(numMessages >= 0);
if (frame == null) {
writeQueue.scheduleFlush();
@ -140,17 +155,37 @@ class NettyServerStream extends AbstractServerStream {
});
}
@Override
public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) {
PerfMark.startTask("NettyServerStream$Sink.writeFrame");
try {
writeFrameInternal(frame, flush, numMessages);
} finally {
PerfMark.stopTask("NettyServerStream$Sink.writeFrame");
}
}
@Override
public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent);
writeQueue.enqueue(
SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status),
true);
PerfMark.startTask("NettyServerStream$Sink.writeTrailers");
try {
Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent);
writeQueue.enqueue(
SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status),
true);
} finally {
PerfMark.stopTask("NettyServerStream$Sink.writeTrailers");
}
}
@Override
public void cancel(Status status) {
writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true);
PerfMark.startTask("NettyServerStream$Sink.cancel");
try {
writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true);
} finally {
PerfMark.startTask("NettyServerStream$Sink.cancel");
}
}
}

View File

@ -21,6 +21,8 @@ import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.DefaultByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import io.perfmark.Link;
import io.perfmark.PerfMark;
/**
* Command sent from the transport to the Netty channel to send a GRPC frame to the remote endpoint.
@ -28,6 +30,7 @@ import io.netty.channel.ChannelPromise;
final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQueue.QueuedCommand {
private final StreamIdHolder stream;
private final boolean endStream;
private final Link link;
private ChannelPromise promise;
@ -35,6 +38,12 @@ final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQu
super(content);
this.stream = stream;
this.endStream = endStream;
this.link = PerfMark.link();
}
@Override
public Link getLink() {
return link;
}
int streamId() {

View File

@ -22,6 +22,8 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.perfmark.Link;
import io.perfmark.PerfMark;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@ -104,26 +106,44 @@ class WriteQueue {
* called in the event loop
*/
private void flush() {
PerfMark.startTask("WriteQueue.periodicFlush");
try {
QueuedCommand cmd;
int i = 0;
boolean flushedOnce = false;
while ((cmd = queue.poll()) != null) {
cmd.run(channel);
PerfMark.startTask("WriteQueue.run");
try {
cmd.getLink().link();
cmd.run(channel);
} finally {
PerfMark.stopTask("WriteQueue.run");
}
if (++i == DEQUE_CHUNK_SIZE) {
i = 0;
// Flush each chunk so we are releasing buffers periodically. In theory this loop
// might never end as new events are continuously added to the queue, if we never
// flushed in that case we would be guaranteed to OOM.
channel.flush();
PerfMark.startTask("WriteQueue.flush0");
try {
channel.flush();
} finally {
PerfMark.stopTask("WriteQueue.flush0");
}
flushedOnce = true;
}
}
// Must flush at least once, even if there were no writes.
if (i != 0 || !flushedOnce) {
channel.flush();
PerfMark.startTask("WriteQueue.flush1");
try {
channel.flush();
} finally {
PerfMark.stopTask("WriteQueue.flush1");
}
}
} finally {
PerfMark.stopTask("WriteQueue.periodicFlush");
// Mark the write as done, if the queue is non-empty after marking trigger a new write.
scheduled.set(false);
if (!queue.isEmpty()) {
@ -134,8 +154,10 @@ class WriteQueue {
private static class RunnableCommand implements QueuedCommand {
private final Runnable runnable;
private final Link link;
public RunnableCommand(Runnable runnable) {
this.link = PerfMark.link();
this.runnable = runnable;
}
@ -153,11 +175,21 @@ class WriteQueue {
public final void run(Channel channel) {
runnable.run();
}
@Override
public Link getLink() {
return link;
}
}
abstract static class AbstractQueuedCommand implements QueuedCommand {
private ChannelPromise promise;
private final Link link;
AbstractQueuedCommand() {
this.link = PerfMark.link();
}
@Override
public final void promise(ChannelPromise promise) {
@ -173,6 +205,11 @@ class WriteQueue {
public final void run(Channel channel) {
channel.write(this, promise);
}
@Override
public Link getLink() {
return link;
}
}
/**
@ -190,5 +227,7 @@ class WriteQueue {
void promise(ChannelPromise promise);
void run(Channel channel);
Link getLink();
}
}

View File

@ -35,6 +35,7 @@ def grpc_java_repositories(
omit_io_netty_tcnative_boringssl_static = False,
omit_io_opencensus_api = False,
omit_io_opencensus_grpc_metrics = False,
omit_io_perfmark = False,
omit_javax_annotation = False,
omit_junit_junit = False,
omit_net_zlib = False,
@ -103,6 +104,8 @@ def grpc_java_repositories(
io_opencensus_api()
if not omit_io_opencensus_grpc_metrics:
io_opencensus_grpc_metrics()
if not omit_io_perfmark:
io_perfmark()
if not omit_javax_annotation:
javax_annotation()
if not omit_junit_junit:
@ -402,6 +405,15 @@ def io_opencensus_grpc_metrics():
licenses = ["notice"], # Apache 2.0
)
def io_perfmark():
jvm_maven_import_external(
name = "io_perfmark_perfmark_api",
artifact = "io.perfmark:perfmark-api:0.16.0",
server_urls = ["http://central.maven.org/maven2"],
artifact_sha256 = "a93667875ea9d10315177768739a18d6c667df041c982d2841645ae8558d0af0",
licenses = ["notice"], # Apache 2.0
)
def javax_annotation():
# Use //stub:javax_annotation for neverlink=1 support.
jvm_maven_import_external(