diff --git a/build.gradle b/build.gradle index d7db6ebed9..c7b00125c3 100644 --- a/build.gradle +++ b/build.gradle @@ -190,7 +190,7 @@ subprojects { google_auth_credentials: 'com.google.auth:google-auth-library-credentials:0.4.0', okhttp: 'com.squareup.okhttp:okhttp:2.5.0', okio: 'com.squareup.okio:okio:1.6.0', - opencensus_api: 'io.opencensus:opencensus-api:0.5.1', + opencensus_api: 'io.opencensus:opencensus-api:0.6.0', instrumentation_api: 'com.google.instrumentation:instrumentation-api:0.4.3', protobuf: "com.google.protobuf:protobuf-java:${protobufVersion}", protobuf_lite: "com.google.protobuf:protobuf-lite:3.0.1", diff --git a/core/build.gradle b/core/build.gradle index 81ceffffd7..b6cd5f51fa 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -12,6 +12,8 @@ dependencies { exclude group: 'io.grpc', module: 'grpc-context' } compile (libraries.opencensus_api) { + // prefer 3.0.0 from libraries instead of 3.0.1 + exclude group: 'com.google.code.findbugs', module: 'jsr305' // prefer 2.0.19 from libraries instead of 2.0.11 exclude group: 'com.google.errorprone', module: 'error_prone_annotations' // we'll always be more up-to-date diff --git a/core/src/main/java/io/grpc/StreamTracer.java b/core/src/main/java/io/grpc/StreamTracer.java index bf1b33f5f4..3f78c7867e 100644 --- a/core/src/main/java/io/grpc/StreamTracer.java +++ b/core/src/main/java/io/grpc/StreamTracer.java @@ -36,18 +36,68 @@ public abstract class StreamTracer { * An outbound message has been passed to the stream. This is called as soon as the stream knows * about the message, but doesn't have further guarantee such as whether the message is serialized * or not. + * + * @deprecated use {@link #outboundMessage(int)} */ + @Deprecated public void outboundMessage() { } + /** + * An outbound message has been passed to the stream. This is called as soon as the stream knows + * about the message, but doesn't have further guarantee such as whether the message is serialized + * or not. + * + * @param seqNo the sequential number of the message within the stream, starting from 0. It can + * be used to correlate with {@link #outboundMessageSent} for the same message. + */ + public void outboundMessage(int seqNo) { + } + /** * An inbound message has been received by the stream. This is called as soon as the stream knows * about the message, but doesn't have further guarantee such as whether the message is * deserialized or not. + * + * @deprecated use {@link #inboundMessage(int)} */ + @Deprecated public void inboundMessage() { } + /** + * An inbound message has been received by the stream. This is called as soon as the stream knows + * about the message, but doesn't have further guarantee such as whether the message is + * deserialized or not. + * + * @param seqNo the sequential number of the message within the stream, starting from 0. It can + * be used to correlate with {@link #inboundMessageRead} for the same message. + */ + public void inboundMessage(int seqNo) { + } + + /** + * An outbound message has been serialized and sent to the transport. + * + * @param seqNo the sequential number of the message within the stream, starting from 0. It can + * be used to correlate with {@link #outboundMessage(int)} for the same message. + * @param optionalWireSize the wire size of the message. -1 if unknown + * @param optionalUncompressedSize the uncompressed serialized size of the message. -1 if unknown + */ + public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + } + + /** + * An inbound message has been fully read from the transport. + * + * @param seqNo the sequential number of the message within the stream, starting from 0. It can + * be used to correlate with {@link #inboundMessage(int)} for the same message. + * @param optionalWireSize the wire size of the message. -1 if unknown + * @param optionalUncompressedSize the uncompressed serialized size of the message. -1 if unknown + */ + public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + } + /** * The wire size of some outbound data is revealed. This can only used to record the accumulative * outbound wire size. There is no guarantee wrt timing or granularity of this method. diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index 2ba8a78dcc..d6af1f27be 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -359,7 +359,8 @@ public abstract class AbstractClientStream extends AbstractStream } catch (java.io.IOException ex) { throw new RuntimeException(ex); } - statsTraceCtx.outboundMessage(); + statsTraceCtx.outboundMessage(0); + statsTraceCtx.outboundMessageSent(0, payload.length, payload.length); statsTraceCtx.outboundUncompressedSize(payload.length); // NB(zhangkun83): this is not accurate, because the underlying transport will probably encode // it using e.g., base64. However, we are not supposed to know such detail here. diff --git a/core/src/main/java/io/grpc/internal/CensusStatsModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java index d917b6b1a7..7d4cbcce14 100644 --- a/core/src/main/java/io/grpc/internal/CensusStatsModule.java +++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java @@ -161,12 +161,12 @@ final class CensusStatsModule { } @Override - public void inboundMessage() { + public void inboundMessage(int seqNo) { inboundMessageCount.incrementAndGet(); } @Override - public void outboundMessage() { + public void outboundMessage(int seqNo) { outboundMessageCount.incrementAndGet(); } } @@ -282,12 +282,12 @@ final class CensusStatsModule { } @Override - public void inboundMessage() { + public void inboundMessage(int seqNo) { inboundMessageCount.incrementAndGet(); } @Override - public void outboundMessage() { + public void outboundMessage(int seqNo) { outboundMessageCount.incrementAndGet(); } diff --git a/core/src/main/java/io/grpc/internal/CensusTracingModule.java b/core/src/main/java/io/grpc/internal/CensusTracingModule.java index 8e31c98985..7520b24bf4 100644 --- a/core/src/main/java/io/grpc/internal/CensusTracingModule.java +++ b/core/src/main/java/io/grpc/internal/CensusTracingModule.java @@ -33,6 +33,7 @@ import io.grpc.MethodDescriptor; import io.grpc.ServerStreamTracer; import io.grpc.StreamTracer; import io.opencensus.trace.EndSpanOptions; +import io.opencensus.trace.NetworkEvent; import io.opencensus.trace.Span; import io.opencensus.trace.SpanContext; import io.opencensus.trace.Status; @@ -56,8 +57,6 @@ import javax.annotation.Nullable; */ final class CensusTracingModule { private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName()); - // TODO(zhangkun83): record NetworkEvent to Span for each message - private static final ClientStreamTracer noopClientTracer = new ClientStreamTracer() {}; private final Tracer censusTracer; @VisibleForTesting @@ -182,6 +181,19 @@ final class CensusTracingModule { return EndSpanOptions.builder().setStatus(convertStatus(status)).build(); } + private static void recordNetworkEvent( + Span span, NetworkEvent.Type type, + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + NetworkEvent.Builder eventBuilder = NetworkEvent.builder(type, seqNo); + if (optionalUncompressedSize != -1) { + eventBuilder.setUncompressedMessageSize(optionalUncompressedSize); + } + if (optionalWireSize != -1) { + eventBuilder.setCompressedMessageSize(optionalWireSize); + } + span.addNetworkEvent(eventBuilder.build()); + } + @VisibleForTesting final class ClientCallTracer extends ClientStreamTracer.Factory { @@ -201,7 +213,7 @@ final class CensusTracingModule { public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { headers.discardAll(tracingHeader); headers.put(tracingHeader, span.getContext()); - return noopClientTracer; + return new ClientTracer(span); } /** @@ -218,6 +230,28 @@ final class CensusTracingModule { } } + private static final class ClientTracer extends ClientStreamTracer { + private final Span span; + + ClientTracer(Span span) { + this.span = checkNotNull(span, "span"); + } + + @Override + public void outboundMessageSent( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + recordNetworkEvent( + span, NetworkEvent.Type.SENT, seqNo, optionalWireSize, optionalUncompressedSize); + } + + @Override + public void inboundMessageRead( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + recordNetworkEvent( + span, NetworkEvent.Type.RECV, seqNo, optionalWireSize, optionalUncompressedSize); + } + } + private final class ServerTracer extends ServerStreamTracer { private final Span span; private final AtomicBoolean streamClosed = new AtomicBoolean(false); @@ -252,6 +286,20 @@ final class CensusTracingModule { // inherit from the parent Context. return context.withValue(CONTEXT_SPAN_KEY, span); } + + @Override + public void outboundMessageSent( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + recordNetworkEvent( + span, NetworkEvent.Type.SENT, seqNo, optionalWireSize, optionalUncompressedSize); + } + + @Override + public void inboundMessageRead( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + recordNetworkEvent( + span, NetworkEvent.Type.RECV, seqNo, optionalWireSize, optionalUncompressedSize); + } } @VisibleForTesting diff --git a/core/src/main/java/io/grpc/internal/MessageDeframer.java b/core/src/main/java/io/grpc/internal/MessageDeframer.java index f368e2cf1e..3274d7babe 100644 --- a/core/src/main/java/io/grpc/internal/MessageDeframer.java +++ b/core/src/main/java/io/grpc/internal/MessageDeframer.java @@ -94,6 +94,7 @@ public class MessageDeframer implements Closeable, Deframer { private CompositeReadableBuffer unprocessed = new CompositeReadableBuffer(); private long pendingDeliveries; private boolean inDelivery = false; + private int currentMessageSeqNo = -1; private boolean closeWhenComplete = false; private volatile boolean stopDelivery = false; @@ -317,7 +318,8 @@ public class MessageDeframer implements Closeable, Deframer { .asRuntimeException(); } - statsTraceCtx.inboundMessage(); + currentMessageSeqNo++; + statsTraceCtx.inboundMessage(currentMessageSeqNo); // Continue reading the frame body. state = State.BODY; } @@ -326,6 +328,10 @@ public class MessageDeframer implements Closeable, Deframer { * Processes the GRPC message body, which depending on frame header flags may be compressed. */ private void processBody() { + // There is no reliable way to get the uncompressed size per message when it's compressed, + // because the uncompressed bytes are provided through an InputStream whose total size is + // unknown until all bytes are read, and we don't know when it happens. + statsTraceCtx.inboundMessageRead(currentMessageSeqNo, requiredLength, -1); InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody(); nextFrame = null; listener.messagesAvailable(new SingleMessageProducer(stream)); diff --git a/core/src/main/java/io/grpc/internal/MessageFramer.java b/core/src/main/java/io/grpc/internal/MessageFramer.java index 665f84eeea..e401005794 100644 --- a/core/src/main/java/io/grpc/internal/MessageFramer.java +++ b/core/src/main/java/io/grpc/internal/MessageFramer.java @@ -75,6 +75,10 @@ public class MessageFramer implements Framer { private final StatsTraceContext statsTraceCtx; private boolean closed; + // Tracing and stats-related states + private int currentMessageSeqNo = -1; + private long currentMessageWireSize; + /** * Creates a {@code MessageFramer}. * @@ -114,7 +118,9 @@ public class MessageFramer implements Framer { @Override public void writePayload(InputStream message) { verifyNotClosed(); - statsTraceCtx.outboundMessage(); + currentMessageSeqNo++; + currentMessageWireSize = 0; + statsTraceCtx.outboundMessage(currentMessageSeqNo); boolean compressed = messageCompression && compressor != Codec.Identity.NONE; int written = -1; int messageLength = -2; @@ -143,11 +149,13 @@ public class MessageFramer implements Framer { throw Status.INTERNAL.withDescription(err).asRuntimeException(); } statsTraceCtx.outboundUncompressedSize(written); + statsTraceCtx.outboundWireSize(currentMessageWireSize); + statsTraceCtx.outboundMessageSent(currentMessageSeqNo, currentMessageWireSize, written); } private int writeUncompressed(InputStream message, int messageLength) throws IOException { if (messageLength != -1) { - statsTraceCtx.outboundWireSize(messageLength); + currentMessageWireSize = messageLength; return writeKnownLengthUncompressed(message, messageLength); } BufferChainOutputStream bufferChain = new BufferChainOutputStream(); @@ -240,7 +248,7 @@ public class MessageFramer implements Framer { // Assign the current buffer to the last in the chain so it can be used // for future writes or written with end-of-stream=true on close. buffer = bufferList.get(bufferList.size() - 1); - statsTraceCtx.outboundWireSize(messageLength); + currentMessageWireSize = messageLength; } private static int writeToOutputStream(InputStream message, OutputStream outputStream) diff --git a/core/src/main/java/io/grpc/internal/StatsTraceContext.java b/core/src/main/java/io/grpc/internal/StatsTraceContext.java index 1326a1b856..a37a8cc1eb 100644 --- a/core/src/main/java/io/grpc/internal/StatsTraceContext.java +++ b/core/src/main/java/io/grpc/internal/StatsTraceContext.java @@ -150,27 +150,53 @@ public final class StatsTraceContext { } /** - * See {@link StreamTracer#outboundMessage}. + * See {@link StreamTracer#outboundMessage(int)}. * *

Called from {@link io.grpc.internal.Framer}. */ - public void outboundMessage() { + @SuppressWarnings("deprecation") + public void outboundMessage(int seqNo) { for (StreamTracer tracer : tracers) { + tracer.outboundMessage(seqNo); tracer.outboundMessage(); } } /** - * See {@link StreamTracer#inboundMessage}. + * See {@link StreamTracer#inboundMessage(int)}. * *

Called from {@link io.grpc.internal.MessageDeframer}. */ - public void inboundMessage() { + @SuppressWarnings("deprecation") + public void inboundMessage(int seqNo) { for (StreamTracer tracer : tracers) { + tracer.inboundMessage(seqNo); tracer.inboundMessage(); } } + /** + * See {@link StreamTracer#outboundMessageSent}. + * + *

Called from {@link io.grpc.internal.Framer}. + */ + public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + for (StreamTracer tracer : tracers) { + tracer.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize); + } + } + + /** + * See {@link StreamTracer#inboundMessageRead}. + * + *

Called from {@link io.grpc.internal.MessageDeframer}. + */ + public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + for (StreamTracer tracer : tracers) { + tracer.inboundMessageRead(seqNo, optionalWireSize, optionalUncompressedSize); + } + } + /** * See {@link StreamTracer#outboundUncompressedSize}. * diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java index badd716516..7f9f6d9dea 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java @@ -16,8 +16,10 @@ package io.grpc.internal; +import static com.google.common.truth.Truth.assertThat; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -239,7 +241,11 @@ public class AbstractClientStreamTest { // GET requests don't have BODY. verify(sink, never()) .writeFrame(any(WritableBuffer.class), any(Boolean.class), any(Boolean.class)); - assertEquals(1, tracer.getOutboundMessageCount()); + assertThat(tracer.nextOutboundEvent()).isEqualTo("outboundMessage(0)"); + assertThat(tracer.nextOutboundEvent()).isEqualTo("outboundMessage()"); + assertThat(tracer.nextOutboundEvent()).matches("outboundMessageSent\\(0, [0-9]+, [0-9]+\\)"); + assertNull(tracer.nextOutboundEvent()); + assertNull(tracer.nextInboundEvent()); assertEquals(1, tracer.getOutboundWireSize()); assertEquals(1, tracer.getOutboundUncompressedSize()); } diff --git a/core/src/test/java/io/grpc/internal/CensusModulesTest.java b/core/src/test/java/io/grpc/internal/CensusModulesTest.java index 90b980d409..84cecd4625 100644 --- a/core/src/test/java/io/grpc/internal/CensusModulesTest.java +++ b/core/src/test/java/io/grpc/internal/CensusModulesTest.java @@ -31,8 +31,10 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; import static org.mockito.Matchers.same; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; @@ -63,6 +65,7 @@ import io.opencensus.trace.AttributeValue; import io.opencensus.trace.EndSpanOptions; import io.opencensus.trace.Link; import io.opencensus.trace.NetworkEvent; +import io.opencensus.trace.NetworkEvent.Type; import io.opencensus.trace.Sampler; import io.opencensus.trace.Span; import io.opencensus.trace.SpanBuilder; @@ -90,6 +93,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; import org.mockito.Captor; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -166,6 +170,8 @@ public class CensusModulesTest { private ArgumentCaptor> clientCallListenerCaptor; @Captor private ArgumentCaptor statusCaptor; + @Captor + private ArgumentCaptor networkEventCaptor; private CensusStatsModule censusStats; private CensusTracingModule censusTracing; @@ -322,20 +328,20 @@ public class CensusModulesTest { tracer.outboundHeaders(); fakeClock.forwardTime(100, MILLISECONDS); - tracer.outboundMessage(); + tracer.outboundMessage(0); tracer.outboundWireSize(1028); tracer.outboundUncompressedSize(1128); fakeClock.forwardTime(16, MILLISECONDS); - tracer.inboundMessage(); + tracer.inboundMessage(0); tracer.inboundWireSize(33); tracer.inboundUncompressedSize(67); - tracer.outboundMessage(); + tracer.outboundMessage(1); tracer.outboundWireSize(99); tracer.outboundUncompressedSize(865); fakeClock.forwardTime(24, MILLISECONDS); - tracer.inboundMessage(); + tracer.inboundMessage(1); tracer.inboundWireSize(154); tracer.inboundUncompressedSize(552); tracer.streamClosed(Status.OK); @@ -372,11 +378,32 @@ public class CensusModulesTest { eq("Sent.package1.service2.method3"), isNull(Span.class)); verify(spyClientSpan, never()).end(any(EndSpanOptions.class)); + clientStreamTracer.outboundMessage(0); + clientStreamTracer.outboundMessageSent(0, 882, -1); + clientStreamTracer.inboundMessage(0); + clientStreamTracer.outboundMessage(1); + clientStreamTracer.outboundMessageSent(1, -1, 27); + clientStreamTracer.inboundMessageRead(0, 255, 90); + clientStreamTracer.streamClosed(Status.OK); callTracer.callEnded(Status.OK); - verify(spyClientSpan).end( + InOrder inOrder = inOrder(spyClientSpan); + inOrder.verify(spyClientSpan, times(3)).addNetworkEvent(networkEventCaptor.capture()); + List events = networkEventCaptor.getAllValues(); + assertEquals( + NetworkEvent.builder(Type.SENT, 0).setCompressedMessageSize(882).build(), events.get(0)); + assertEquals( + NetworkEvent.builder(Type.SENT, 1).setUncompressedMessageSize(27).build(), events.get(1)); + assertEquals( + NetworkEvent.builder(Type.RECV, 0) + .setCompressedMessageSize(255) + .setUncompressedMessageSize(90) + .build(), + events.get(2)); + inOrder.verify(spyClientSpan).end( EndSpanOptions.builder().setStatus(io.opencensus.trace.Status.OK).build()); + verifyNoMoreInteractions(spyClientSpan); verifyNoMoreInteractions(tracer); } @@ -424,7 +451,7 @@ public class CensusModulesTest { io.opencensus.trace.Status.DEADLINE_EXCEEDED .withDescription("3 seconds")) .build()); - verify(spyClientSpan, never()).end(); + verifyNoMoreInteractions(spyClientSpan); } @Test @@ -593,20 +620,20 @@ public class CensusModulesTest { .with(RpcConstants.RPC_SERVER_METHOD, TagValue.create(method.getFullMethodName())), statsCtx); - tracer.inboundMessage(); + tracer.inboundMessage(0); tracer.inboundWireSize(34); tracer.inboundUncompressedSize(67); fakeClock.forwardTime(100, MILLISECONDS); - tracer.outboundMessage(); + tracer.outboundMessage(0); tracer.outboundWireSize(1028); tracer.outboundUncompressedSize(1128); fakeClock.forwardTime(16, MILLISECONDS); - tracer.inboundMessage(); + tracer.inboundMessage(1); tracer.inboundWireSize(154); tracer.inboundUncompressedSize(552); - tracer.outboundMessage(); + tracer.outboundMessage(1); tracer.outboundWireSize(99); tracer.outboundUncompressedSize(865); @@ -648,12 +675,33 @@ public class CensusModulesTest { assertSame(spyServerSpan, ContextUtils.CONTEXT_SPAN_KEY.get(filteredContext)); verify(spyServerSpan, never()).end(any(EndSpanOptions.class)); + + serverStreamTracer.outboundMessage(0); + serverStreamTracer.outboundMessageSent(0, 882, -1); + serverStreamTracer.inboundMessage(0); + serverStreamTracer.outboundMessage(1); + serverStreamTracer.outboundMessageSent(1, -1, 27); + serverStreamTracer.inboundMessageRead(0, 255, 90); + serverStreamTracer.streamClosed(Status.CANCELLED); - verify(spyServerSpan).end( + InOrder inOrder = inOrder(spyServerSpan); + inOrder.verify(spyServerSpan, times(3)).addNetworkEvent(networkEventCaptor.capture()); + List events = networkEventCaptor.getAllValues(); + assertEquals( + NetworkEvent.builder(Type.SENT, 0).setCompressedMessageSize(882).build(), events.get(0)); + assertEquals( + NetworkEvent.builder(Type.SENT, 1).setUncompressedMessageSize(27).build(), events.get(1)); + assertEquals( + NetworkEvent.builder(Type.RECV, 0) + .setCompressedMessageSize(255) + .setUncompressedMessageSize(90) + .build(), + events.get(2)); + inOrder.verify(spyServerSpan).end( EndSpanOptions.builder() .setStatus(io.opencensus.trace.Status.CANCELLED).build()); - verify(spyServerSpan, never()).end(); + verifyNoMoreInteractions(spyServerSpan); } @Test @@ -713,6 +761,7 @@ public class CensusModulesTest { } @Override + @SuppressWarnings("deprecation") public void addAttributes(Map attributes) {} @Override diff --git a/core/src/test/java/io/grpc/internal/MessageDeframerTest.java b/core/src/test/java/io/grpc/internal/MessageDeframerTest.java index f0eb14a662..4e6039a6f5 100644 --- a/core/src/test/java/io/grpc/internal/MessageDeframerTest.java +++ b/core/src/test/java/io/grpc/internal/MessageDeframerTest.java @@ -18,6 +18,7 @@ package io.grpc.internal; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.atLeastOnce; @@ -77,7 +78,7 @@ public class MessageDeframerTest { assertEquals(Bytes.asList(new byte[] {3, 14}), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(1, 2, 2); + checkStats(2, 2); } @Test @@ -91,7 +92,7 @@ public class MessageDeframerTest { verify(listener, atLeastOnce()).bytesRead(anyInt()); assertEquals(Bytes.asList(new byte[] {14, 15}), bytes(streams.get(1).next())); verifyNoMoreInteractions(listener); - checkStats(2, 3, 3); + checkStats(1, 1, 2, 2); } @Test @@ -104,7 +105,7 @@ public class MessageDeframerTest { verify(listener).deframerClosed(false); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(1, 1, 1); + checkStats(1, 1); } @Test @@ -113,7 +114,7 @@ public class MessageDeframerTest { deframer.closeWhenComplete(); verify(listener).deframerClosed(false); verifyNoMoreInteractions(listener); - checkStats(0, 0, 0); + checkStats(); } @Test @@ -124,7 +125,7 @@ public class MessageDeframerTest { verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener).deframerClosed(true); verifyNoMoreInteractions(listener); - checkStats(0, 0, 0); + checkStats(); } @Test @@ -139,7 +140,7 @@ public class MessageDeframerTest { Bytes.asList(new byte[] {3, 14, 1, 5, 9, 2, 6}), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(1, 7, 7); + checkStats(7, 7); } @Test @@ -154,7 +155,7 @@ public class MessageDeframerTest { assertEquals(Bytes.asList(new byte[] {3}), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(1, 1, 1); + checkStats(1, 1); } @Test @@ -165,7 +166,7 @@ public class MessageDeframerTest { assertEquals(Bytes.asList(), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(1, 0, 0); + checkStats(0, 0); } @Test @@ -177,7 +178,7 @@ public class MessageDeframerTest { assertEquals(Bytes.asList(new byte[1000]), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(1, 1000, 1000); + checkStats(1000, 1000); } @Test @@ -192,7 +193,7 @@ public class MessageDeframerTest { verify(listener).deframerClosed(false); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(1, 1, 1); + checkStats(1, 1); } @Test @@ -209,7 +210,7 @@ public class MessageDeframerTest { assertEquals(Bytes.asList(new byte[1000]), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(1, payload.length, 1000); + checkStats(payload.length, 1000); } @Test @@ -245,8 +246,7 @@ public class MessageDeframerTest { while (stream.read() != -1) {} stream.close(); - // SizeEnforcingInputStream only reports uncompressed bytes - checkStats(0, 0, 3); + checkSizeEnforcingInputStreamStats(3); } @Test @@ -258,8 +258,7 @@ public class MessageDeframerTest { while (stream.read() != -1) {} stream.close(); - // SizeEnforcingInputStream only reports uncompressed bytes - checkStats(0, 0, 3); + checkSizeEnforcingInputStreamStats(3); } @Test @@ -289,8 +288,7 @@ public class MessageDeframerTest { assertEquals(3, read); stream.close(); - // SizeEnforcingInputStream only reports uncompressed bytes - checkStats(0, 0, 3); + checkSizeEnforcingInputStreamStats(3); } @Test @@ -304,8 +302,7 @@ public class MessageDeframerTest { assertEquals(3, read); stream.close(); - // SizeEnforcingInputStream only reports uncompressed bytes - checkStats(0, 0, 3); + checkSizeEnforcingInputStreamStats(3); } @Test @@ -336,8 +333,7 @@ public class MessageDeframerTest { assertEquals(3, skipped); stream.close(); - // SizeEnforcingInputStream only reports uncompressed bytes - checkStats(0, 0, 3); + checkSizeEnforcingInputStreamStats(3); } @Test @@ -350,8 +346,7 @@ public class MessageDeframerTest { assertEquals(3, skipped); stream.close(); - // SizeEnforcingInputStream only reports uncompressed bytes - checkStats(0, 0, 3); + checkSizeEnforcingInputStreamStats(3); } @Test @@ -384,15 +379,38 @@ public class MessageDeframerTest { assertEquals(2, skipped); stream.close(); - // SizeEnforcingInputStream only reports uncompressed bytes - checkStats(0, 0, 3); + checkSizeEnforcingInputStreamStats(3); } - private void checkStats( - int messagesReceived, long wireBytesReceived, long uncompressedBytesReceived) { - assertEquals(messagesReceived, tracer.getInboundMessageCount()); - assertEquals(wireBytesReceived, tracer.getInboundWireSize()); - assertEquals(uncompressedBytesReceived, tracer.getInboundUncompressedSize()); + /** + * @param sizes in the format {wire0, uncompressed0, wire1, uncompressed1, ...} + */ + private void checkStats(long... sizes) { + assertEquals(0, sizes.length % 2); + int count = sizes.length / 2; + long expectedWireSize = 0; + long expectedUncompressedSize = 0; + for (int i = 0; i < count; i++) { + assertEquals("inboundMessage(" + i + ")", tracer.nextInboundEvent()); + assertEquals("inboundMessage()", tracer.nextInboundEvent()); + assertEquals( + String.format("inboundMessageRead(%d, %d, -1)", i, sizes[i * 2]), + tracer.nextInboundEvent()); + expectedWireSize += sizes[i * 2]; + expectedUncompressedSize += sizes[i * 2 + 1]; + } + assertNull(tracer.nextInboundEvent()); + assertNull(tracer.nextOutboundEvent()); + assertEquals(expectedWireSize, tracer.getInboundWireSize()); + assertEquals(expectedUncompressedSize, tracer.getInboundUncompressedSize()); + } + + private void checkSizeEnforcingInputStreamStats(long uncompressedSize) { + assertNull(tracer.nextInboundEvent()); + assertNull(tracer.nextOutboundEvent()); + assertEquals(0, tracer.getInboundWireSize()); + // SizeEnforcingInputStream only reports uncompressed bytes + assertEquals(uncompressedSize, tracer.getInboundUncompressedSize()); } private static List bytes(InputStream in) { diff --git a/core/src/test/java/io/grpc/internal/MessageFramerTest.java b/core/src/test/java/io/grpc/internal/MessageFramerTest.java index 68ed71ff6f..97477bc920 100644 --- a/core/src/test/java/io/grpc/internal/MessageFramerTest.java +++ b/core/src/test/java/io/grpc/internal/MessageFramerTest.java @@ -17,6 +17,7 @@ package io.grpc.internal; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.eq; @@ -82,7 +83,7 @@ public class MessageFramerTest { verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true); assertEquals(1, allocator.allocCount); verifyNoMoreInteractions(sink); - checkStats(1, 2, 2); + checkStats(2, 2); } @Test @@ -94,7 +95,7 @@ public class MessageFramerTest { verify(sink).deliverFrame(toWriteBuffer(new byte[] {3, 14}), false, true); assertEquals(2, allocator.allocCount); verifyNoMoreInteractions(sink); - checkStats(1, 2, 2); + checkStats(2, 2); } @Test @@ -108,7 +109,7 @@ public class MessageFramerTest { toWriteBuffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false, true); verifyNoMoreInteractions(sink); assertEquals(1, allocator.allocCount); - checkStats(2, 2, 2); + checkStats(1, 1, 1, 1); } @Test @@ -120,7 +121,7 @@ public class MessageFramerTest { toWriteBuffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true, true); verifyNoMoreInteractions(sink); assertEquals(1, allocator.allocCount); - checkStats(1, 7, 7); + checkStats(7, 7); } @Test @@ -129,7 +130,7 @@ public class MessageFramerTest { verify(sink).deliverFrame(null, true, true); verifyNoMoreInteractions(sink); assertEquals(0, allocator.allocCount); - checkStats(0, 0, 0); + checkStats(); } @Test @@ -145,7 +146,7 @@ public class MessageFramerTest { verify(sink).deliverFrame(toWriteBuffer(new byte[] {5}), false, true); verifyNoMoreInteractions(sink); assertEquals(2, allocator.allocCount); - checkStats(1, 8, 8); + checkStats(8, 8); } @Test @@ -162,7 +163,7 @@ public class MessageFramerTest { verify(sink).deliverFrame(toWriteBufferWithMinSize(new byte[] {1, 3}, 12), false, true); verifyNoMoreInteractions(sink); assertEquals(2, allocator.allocCount); - checkStats(2, 4, 4); + checkStats(3, 3, 1, 1); } @Test @@ -171,7 +172,7 @@ public class MessageFramerTest { framer.flush(); verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true); assertEquals(1, allocator.allocCount); - checkStats(1, 0, 0); + checkStats(0, 0); } @Test @@ -182,7 +183,7 @@ public class MessageFramerTest { verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true); // One alloc for the header assertEquals(1, allocator.allocCount); - checkStats(1, 0, 0); + checkStats(0, 0); } @Test @@ -193,7 +194,7 @@ public class MessageFramerTest { verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true); verifyNoMoreInteractions(sink); assertEquals(1, allocator.allocCount); - checkStats(1, 2, 2); + checkStats(2, 2); } @Test @@ -213,7 +214,7 @@ public class MessageFramerTest { assertEquals(toWriteBuffer(data), buffer); verifyNoMoreInteractions(sink); assertEquals(1, allocator.allocCount); - checkStats(1, 1000, 1000); + checkStats(1000, 1000); } @Test @@ -240,7 +241,7 @@ public class MessageFramerTest { verifyNoMoreInteractions(sink); assertEquals(3, allocator.allocCount); - checkStats(1, 1000, 1000); + checkStats(1000, 1000); } @Test @@ -265,7 +266,7 @@ public class MessageFramerTest { assertTrue(length < 1000); assertEquals(frameCaptor.getAllValues().get(1).size(), length); - checkStats(1, length, 1000); + checkStats(length, 1000); } @Test @@ -290,7 +291,7 @@ public class MessageFramerTest { assertEquals(1000, length); assertEquals(buffer.data.length - 5 , length); - checkStats(1, 1000, 1000); + checkStats(1000, 1000); } @Test @@ -316,7 +317,7 @@ public class MessageFramerTest { assertEquals(1000, length); assertEquals(buffer.data.length - 5 , length); - checkStats(1, 1000, 1000); + checkStats(1000, 1000); } @Test @@ -345,7 +346,7 @@ public class MessageFramerTest { writeKnownLength(framer, new byte[]{}); framer.flush(); verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true); - checkStats(1, 0, 0); + checkStats(0, 0); } private static WritableBuffer toWriteBuffer(byte[] data) { @@ -367,13 +368,27 @@ public class MessageFramerTest { // TODO(carl-mastrangelo): add framer.flush() here. } - private void checkStats(int messagesSent, long wireBytesSent, long uncompressedBytesSent) { - long actualWireSize = 0; - long actualUncompressedSize = 0; - - assertEquals(messagesSent, tracer.getOutboundMessageCount()); - assertEquals(uncompressedBytesSent, tracer.getOutboundUncompressedSize()); - assertEquals(wireBytesSent, tracer.getOutboundWireSize()); + /** + * @param sizes in the format {wire0, uncompressed0, wire1, uncompressed1, ...} + */ + private void checkStats(long... sizes) { + assertEquals(0, sizes.length % 2); + int count = sizes.length / 2; + long expectedWireSize = 0; + long expectedUncompressedSize = 0; + for (int i = 0; i < count; i++) { + assertEquals("outboundMessage(" + i + ")", tracer.nextOutboundEvent()); + assertEquals("outboundMessage()", tracer.nextOutboundEvent()); + assertEquals( + String.format("outboundMessageSent(%d, %d, %d)", i, sizes[i * 2], sizes[i * 2 + 1]), + tracer.nextOutboundEvent()); + expectedWireSize += sizes[i * 2]; + expectedUncompressedSize += sizes[i * 2 + 1]; + } + assertNull(tracer.nextOutboundEvent()); + assertNull(tracer.nextInboundEvent()); + assertEquals(expectedWireSize, tracer.getOutboundWireSize()); + assertEquals(expectedUncompressedSize, tracer.getOutboundUncompressedSize()); } static class ByteWritableBuffer implements WritableBuffer { diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbClientLoadRecorder.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbClientLoadRecorder.java index 039c24e19b..3115fa388c 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbClientLoadRecorder.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbClientLoadRecorder.java @@ -117,7 +117,7 @@ final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory { } @Override - public void inboundMessage() { + public void inboundMessage(int seqNo) { anythingReceived.set(true); } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index e18b341daa..83c7f43a07 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -1789,18 +1789,29 @@ public abstract class AbstractInteropTest { TestStreamTracer tracer, Collection sentMessages, Collection receivedMessages) { - assertEquals(sentMessages.size(), tracer.getOutboundMessageCount()); - assertEquals(receivedMessages.size(), tracer.getInboundMessageCount()); - long uncompressedSentSize = 0; + int seqNo = 0; for (MessageLite msg : sentMessages) { + assertThat(tracer.nextOutboundEvent()).isEqualTo(String.format("outboundMessage(%d)", seqNo)); + assertThat(tracer.nextOutboundEvent()).isEqualTo("outboundMessage()"); + assertThat(tracer.nextOutboundEvent()).matches( + String.format( + "outboundMessageSent\\(%d, -?[0-9]+, %d\\)", seqNo, msg.getSerializedSize())); + seqNo++; uncompressedSentSize += msg.getSerializedSize(); } + assertNull(tracer.nextOutboundEvent()); long uncompressedReceivedSize = 0; + seqNo = 0; for (MessageLite msg : receivedMessages) { + assertThat(tracer.nextInboundEvent()).isEqualTo(String.format("inboundMessage(%d)", seqNo)); + assertThat(tracer.nextInboundEvent()).isEqualTo("inboundMessage()"); + assertThat(tracer.nextInboundEvent()).matches( + String.format("inboundMessageRead\\(%d, -?[0-9]+, -?[0-9]+\\)", seqNo)); uncompressedReceivedSize += msg.getSerializedSize(); + seqNo++; } - + assertNull(tracer.nextInboundEvent()); assertEquals(uncompressedSentSize, tracer.getOutboundUncompressedSize()); assertEquals(uncompressedReceivedSize, tracer.getInboundUncompressedSize()); } diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index 27cb94ffd9..b96b155995 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -732,7 +732,8 @@ public abstract class AbstractTransportTest { assertTrue(clientStream.isReady()); clientStream.writeMessage(methodDescriptor.streamRequest("Hello!")); if (metricsExpected()) { - assertThat(clientStreamTracer1.getOutboundMessageCount()).isGreaterThan(0); + assertThat(clientStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage(0)"); + assertThat(clientStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage()"); } clientStream.flush(); @@ -740,10 +741,12 @@ public abstract class AbstractTransportTest { assertEquals("Hello!", methodDescriptor.parseRequest(message)); message.close(); if (metricsExpected()) { - assertThat(clientStreamTracer1.getOutboundMessageCount()).isGreaterThan(0); + assertThat(clientStreamTracer1.nextOutboundEvent()) + .matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)"); assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L); assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L); - assertEquals(1, serverStreamTracer1.getInboundMessageCount()); + assertThat(serverStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)"); + assertThat(serverStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage()"); } assertNull("no additional message expected", serverStreamMessageQueue.poll()); @@ -753,6 +756,8 @@ public abstract class AbstractTransportTest { if (metricsExpected()) { assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L); assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L); + assertThat(serverStreamTracer1.nextInboundEvent()) + .matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)"); } Metadata serverHeaders = new Metadata(); @@ -774,7 +779,8 @@ public abstract class AbstractTransportTest { assertTrue(serverStream.isReady()); serverStream.writeMessage(methodDescriptor.streamResponse("Hi. Who are you?")); if (metricsExpected()) { - assertEquals(1, serverStreamTracer1.getOutboundMessageCount()); + assertThat(serverStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage(0)"); + assertThat(serverStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage()"); } serverStream.flush(); @@ -782,13 +788,18 @@ public abstract class AbstractTransportTest { .messagesAvailable(any(StreamListener.MessageProducer.class)); message = clientStreamMessageQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS); if (metricsExpected()) { + assertThat(serverStreamTracer1.nextOutboundEvent()) + .matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)"); assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L); assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L); assertTrue(clientStreamTracer1.getInboundHeaders()); - assertThat(clientStreamTracer1.getInboundMessageCount()).isGreaterThan(0); + assertThat(clientStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)"); + assertThat(clientStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage()"); } assertEquals("Hi. Who are you?", methodDescriptor.parseResponse(message)); if (metricsExpected()) { + assertThat(clientStreamTracer1.nextInboundEvent()) + .matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)"); assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L); assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L); } @@ -804,6 +815,8 @@ public abstract class AbstractTransportTest { serverStream.close(status, trailers); if (metricsExpected()) { assertSame(status, serverStreamTracer1.getStatus()); + assertNull(serverStreamTracer1.nextInboundEvent()); + assertNull(serverStreamTracer1.nextOutboundEvent()); } verify(mockServerStreamListener, timeout(TIMEOUT_MS)).closed(statusCaptor.capture()); assertCodeEquals(Status.OK, statusCaptor.getValue()); @@ -811,6 +824,8 @@ public abstract class AbstractTransportTest { .closed(statusCaptor.capture(), metadataCaptor.capture()); if (metricsExpected()) { assertSame(statusCaptor.getValue(), clientStreamTracer1.getStatus()); + assertNull(clientStreamTracer1.nextInboundEvent()); + assertNull(clientStreamTracer1.nextOutboundEvent()); } assertEquals(status.getCode(), statusCaptor.getValue().getCode()); assertEquals(status.getDescription(), statusCaptor.getValue().getDescription()); @@ -1118,11 +1133,9 @@ public abstract class AbstractTransportTest { if (metricsExpected()) { assertTrue(clientStreamTracer1.getOutboundHeaders()); assertTrue(clientStreamTracer1.getInboundHeaders()); - assertEquals(1, clientStreamTracer1.getInboundMessageCount()); assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L); assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L); assertSame(status, clientStreamTracer1.getStatus()); - assertEquals(1, serverStreamTracer1.getOutboundMessageCount()); assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L); assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L); // There is a race between client cancelling and server closing. The final status seen by the diff --git a/testing/src/main/java/io/grpc/internal/testing/TestClientStreamTracer.java b/testing/src/main/java/io/grpc/internal/testing/TestClientStreamTracer.java index 7b57d41f6c..ee8c700801 100644 --- a/testing/src/main/java/io/grpc/internal/testing/TestClientStreamTracer.java +++ b/testing/src/main/java/io/grpc/internal/testing/TestClientStreamTracer.java @@ -53,11 +53,6 @@ public class TestClientStreamTracer extends ClientStreamTracer implements TestSt return outboundHeadersCalled.get(); } - @Override - public int getInboundMessageCount() { - return delegate.getInboundMessageCount(); - } - @Override public Status getStatus() { return delegate.getStatus(); @@ -73,11 +68,6 @@ public class TestClientStreamTracer extends ClientStreamTracer implements TestSt return delegate.getInboundUncompressedSize(); } - @Override - public int getOutboundMessageCount() { - return delegate.getOutboundMessageCount(); - } - @Override public long getOutboundWireSize() { return delegate.getOutboundWireSize(); @@ -88,6 +78,16 @@ public class TestClientStreamTracer extends ClientStreamTracer implements TestSt return delegate.getOutboundUncompressedSize(); } + @Override + public String nextOutboundEvent() { + return delegate.nextOutboundEvent(); + } + + @Override + public String nextInboundEvent() { + return delegate.nextInboundEvent(); + } + @Override public void outboundWireSize(long bytes) { delegate.outboundWireSize(bytes); @@ -114,15 +114,37 @@ public class TestClientStreamTracer extends ClientStreamTracer implements TestSt } @Override + @SuppressWarnings("deprecation") public void inboundMessage() { delegate.inboundMessage(); } @Override + public void inboundMessage(int seqNo) { + delegate.inboundMessage(seqNo); + } + + @Override + @SuppressWarnings("deprecation") public void outboundMessage() { delegate.outboundMessage(); } + @Override + public void outboundMessage(int seqNo) { + delegate.outboundMessage(seqNo); + } + + @Override + public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + delegate.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize); + } + + @Override + public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + delegate.inboundMessageRead(seqNo, optionalWireSize, optionalUncompressedSize); + } + @Override public void outboundHeaders() { if (!outboundHeadersCalled.compareAndSet(false, true) diff --git a/testing/src/main/java/io/grpc/internal/testing/TestServerStreamTracer.java b/testing/src/main/java/io/grpc/internal/testing/TestServerStreamTracer.java index 83a2663e6f..9da476b823 100644 --- a/testing/src/main/java/io/grpc/internal/testing/TestServerStreamTracer.java +++ b/testing/src/main/java/io/grpc/internal/testing/TestServerStreamTracer.java @@ -47,11 +47,6 @@ public class TestServerStreamTracer extends ServerStreamTracer implements TestSt return serverCall.get(); } - @Override - public int getInboundMessageCount() { - return delegate.getInboundMessageCount(); - } - @Override public Status getStatus() { return delegate.getStatus(); @@ -67,11 +62,6 @@ public class TestServerStreamTracer extends ServerStreamTracer implements TestSt return delegate.getInboundUncompressedSize(); } - @Override - public int getOutboundMessageCount() { - return delegate.getOutboundMessageCount(); - } - @Override public long getOutboundWireSize() { return delegate.getOutboundWireSize(); @@ -82,6 +72,16 @@ public class TestServerStreamTracer extends ServerStreamTracer implements TestSt return delegate.getOutboundUncompressedSize(); } + @Override + public String nextOutboundEvent() { + return delegate.nextOutboundEvent(); + } + + @Override + public String nextInboundEvent() { + return delegate.nextInboundEvent(); + } + @Override public void outboundWireSize(long bytes) { delegate.outboundWireSize(bytes); @@ -108,15 +108,37 @@ public class TestServerStreamTracer extends ServerStreamTracer implements TestSt } @Override + @SuppressWarnings("deprecation") public void inboundMessage() { delegate.inboundMessage(); } @Override + public void inboundMessage(int seqNo) { + delegate.inboundMessage(seqNo); + } + + @Override + @SuppressWarnings("deprecation") public void outboundMessage() { delegate.outboundMessage(); } + @Override + public void outboundMessage(int seqNo) { + delegate.outboundMessage(seqNo); + } + + @Override + public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + delegate.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize); + } + + @Override + public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + delegate.inboundMessageRead(seqNo, optionalWireSize, optionalUncompressedSize); + } + @Override public void serverCallStarted(ServerCall call) { if (!serverCall.compareAndSet(null, call) && delegate.failDuplicateCallbacks.get()) { diff --git a/testing/src/main/java/io/grpc/internal/testing/TestStreamTracer.java b/testing/src/main/java/io/grpc/internal/testing/TestStreamTracer.java index 1a98a0f031..216c5b2a69 100644 --- a/testing/src/main/java/io/grpc/internal/testing/TestStreamTracer.java +++ b/testing/src/main/java/io/grpc/internal/testing/TestStreamTracer.java @@ -19,11 +19,12 @@ package io.grpc.internal.testing; import io.grpc.Status; import io.grpc.StreamTracer; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; /** * A {@link StreamTracer} suitable for testing. @@ -40,16 +41,6 @@ public interface TestStreamTracer { */ boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException; - /** - * Returns how many times {@link StreamTracer#inboundMessage} has been called. - */ - int getInboundMessageCount(); - - /** - * Returns how many times {@link StreamTracer#outboundMessage} has been called. - */ - int getOutboundMessageCount(); - /** * Returns the status passed to {@link StreamTracer#streamClosed}. */ @@ -75,6 +66,17 @@ public interface TestStreamTracer { */ long getOutboundUncompressedSize(); + /** + * Returns the next captured outbound message event. + */ + @Nullable + String nextOutboundEvent(); + + /** + * Returns the next captured outbound message event. + */ + String nextInboundEvent(); + /** * A {@link StreamTracer} suitable for testing. */ @@ -84,8 +86,8 @@ public interface TestStreamTracer { protected final AtomicLong inboundWireSize = new AtomicLong(); protected final AtomicLong outboundUncompressedSize = new AtomicLong(); protected final AtomicLong inboundUncompressedSize = new AtomicLong(); - protected final AtomicInteger inboundMessageCount = new AtomicInteger(); - protected final AtomicInteger outboundMessageCount = new AtomicInteger(); + protected final LinkedBlockingQueue outboundEvents = new LinkedBlockingQueue(); + protected final LinkedBlockingQueue inboundEvents = new LinkedBlockingQueue(); protected final AtomicReference streamClosedStatus = new AtomicReference(); protected final CountDownLatch streamClosed = new CountDownLatch(1); protected final AtomicBoolean failDuplicateCallbacks = new AtomicBoolean(true); @@ -100,16 +102,6 @@ public interface TestStreamTracer { return streamClosed.await(timeout, timeUnit); } - @Override - public int getInboundMessageCount() { - return inboundMessageCount.get(); - } - - @Override - public int getOutboundMessageCount() { - return outboundMessageCount.get(); - } - @Override public Status getStatus() { return streamClosedStatus.get(); @@ -167,13 +159,52 @@ public interface TestStreamTracer { } @Override + @SuppressWarnings("deprecation") public void inboundMessage() { - inboundMessageCount.incrementAndGet(); + inboundEvents.add("inboundMessage()"); } @Override + public void inboundMessage(int seqNo) { + inboundEvents.add("inboundMessage(" + seqNo + ")"); + } + + @Override + @SuppressWarnings("deprecation") public void outboundMessage() { - outboundMessageCount.incrementAndGet(); + outboundEvents.add("outboundMessage()"); + } + + @Override + public void outboundMessage(int seqNo) { + outboundEvents.add("outboundMessage(" + seqNo + ")"); + } + + @Override + public void outboundMessageSent( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + outboundEvents.add( + String.format( + "outboundMessageSent(%d, %d, %d)", + seqNo, optionalWireSize, optionalUncompressedSize)); + } + + @Override + public void inboundMessageRead( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + inboundEvents.add( + String.format( + "inboundMessageRead(%d, %d, %d)", seqNo, optionalWireSize, optionalUncompressedSize)); + } + + @Override + public String nextOutboundEvent() { + return outboundEvents.poll(); + } + + @Override + public String nextInboundEvent() { + return inboundEvents.poll(); } } }