core: record individual messages with sizes to Census/tracing (#3461)

Two methods, outboundMessageSent() and inboundMessageRead() are added to StreamTracer in order to associate individual messages with sizes. Both types of sizes are optional, as allowed by Census tracing.

Both methods accept a sequence number as the type ID as required by Census. The original outboundMesage() and inboundMessage() are also replaced by overrides that take the sequence number, to better match the new methods. The deprecation of the old overrides are tracked by #3460
This commit is contained in:
Kun Zhang 2017-09-19 09:22:11 -07:00 committed by GitHub
parent a3ff9cd784
commit 7e534ed704
19 changed files with 468 additions and 140 deletions

View File

@ -190,7 +190,7 @@ subprojects {
google_auth_credentials: 'com.google.auth:google-auth-library-credentials:0.4.0',
okhttp: 'com.squareup.okhttp:okhttp:2.5.0',
okio: 'com.squareup.okio:okio:1.6.0',
opencensus_api: 'io.opencensus:opencensus-api:0.5.1',
opencensus_api: 'io.opencensus:opencensus-api:0.6.0',
instrumentation_api: 'com.google.instrumentation:instrumentation-api:0.4.3',
protobuf: "com.google.protobuf:protobuf-java:${protobufVersion}",
protobuf_lite: "com.google.protobuf:protobuf-lite:3.0.1",

View File

@ -12,6 +12,8 @@ dependencies {
exclude group: 'io.grpc', module: 'grpc-context'
}
compile (libraries.opencensus_api) {
// prefer 3.0.0 from libraries instead of 3.0.1
exclude group: 'com.google.code.findbugs', module: 'jsr305'
// prefer 2.0.19 from libraries instead of 2.0.11
exclude group: 'com.google.errorprone', module: 'error_prone_annotations'
// we'll always be more up-to-date

View File

@ -36,18 +36,68 @@ public abstract class StreamTracer {
* An outbound message has been passed to the stream. This is called as soon as the stream knows
* about the message, but doesn't have further guarantee such as whether the message is serialized
* or not.
*
* @deprecated use {@link #outboundMessage(int)}
*/
@Deprecated
public void outboundMessage() {
}
/**
* An outbound message has been passed to the stream. This is called as soon as the stream knows
* about the message, but doesn't have further guarantee such as whether the message is serialized
* or not.
*
* @param seqNo the sequential number of the message within the stream, starting from 0. It can
* be used to correlate with {@link #outboundMessageSent} for the same message.
*/
public void outboundMessage(int seqNo) {
}
/**
* An inbound message has been received by the stream. This is called as soon as the stream knows
* about the message, but doesn't have further guarantee such as whether the message is
* deserialized or not.
*
* @deprecated use {@link #inboundMessage(int)}
*/
@Deprecated
public void inboundMessage() {
}
/**
* An inbound message has been received by the stream. This is called as soon as the stream knows
* about the message, but doesn't have further guarantee such as whether the message is
* deserialized or not.
*
* @param seqNo the sequential number of the message within the stream, starting from 0. It can
* be used to correlate with {@link #inboundMessageRead} for the same message.
*/
public void inboundMessage(int seqNo) {
}
/**
* An outbound message has been serialized and sent to the transport.
*
* @param seqNo the sequential number of the message within the stream, starting from 0. It can
* be used to correlate with {@link #outboundMessage(int)} for the same message.
* @param optionalWireSize the wire size of the message. -1 if unknown
* @param optionalUncompressedSize the uncompressed serialized size of the message. -1 if unknown
*/
public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
}
/**
* An inbound message has been fully read from the transport.
*
* @param seqNo the sequential number of the message within the stream, starting from 0. It can
* be used to correlate with {@link #inboundMessage(int)} for the same message.
* @param optionalWireSize the wire size of the message. -1 if unknown
* @param optionalUncompressedSize the uncompressed serialized size of the message. -1 if unknown
*/
public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
}
/**
* The wire size of some outbound data is revealed. This can only used to record the accumulative
* outbound wire size. There is no guarantee wrt timing or granularity of this method.

View File

@ -359,7 +359,8 @@ public abstract class AbstractClientStream extends AbstractStream
} catch (java.io.IOException ex) {
throw new RuntimeException(ex);
}
statsTraceCtx.outboundMessage();
statsTraceCtx.outboundMessage(0);
statsTraceCtx.outboundMessageSent(0, payload.length, payload.length);
statsTraceCtx.outboundUncompressedSize(payload.length);
// NB(zhangkun83): this is not accurate, because the underlying transport will probably encode
// it using e.g., base64. However, we are not supposed to know such detail here.

View File

@ -161,12 +161,12 @@ final class CensusStatsModule {
}
@Override
public void inboundMessage() {
public void inboundMessage(int seqNo) {
inboundMessageCount.incrementAndGet();
}
@Override
public void outboundMessage() {
public void outboundMessage(int seqNo) {
outboundMessageCount.incrementAndGet();
}
}
@ -282,12 +282,12 @@ final class CensusStatsModule {
}
@Override
public void inboundMessage() {
public void inboundMessage(int seqNo) {
inboundMessageCount.incrementAndGet();
}
@Override
public void outboundMessage() {
public void outboundMessage(int seqNo) {
outboundMessageCount.incrementAndGet();
}

View File

@ -33,6 +33,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.ServerStreamTracer;
import io.grpc.StreamTracer;
import io.opencensus.trace.EndSpanOptions;
import io.opencensus.trace.NetworkEvent;
import io.opencensus.trace.Span;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.Status;
@ -56,8 +57,6 @@ import javax.annotation.Nullable;
*/
final class CensusTracingModule {
private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName());
// TODO(zhangkun83): record NetworkEvent to Span for each message
private static final ClientStreamTracer noopClientTracer = new ClientStreamTracer() {};
private final Tracer censusTracer;
@VisibleForTesting
@ -182,6 +181,19 @@ final class CensusTracingModule {
return EndSpanOptions.builder().setStatus(convertStatus(status)).build();
}
private static void recordNetworkEvent(
Span span, NetworkEvent.Type type,
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
NetworkEvent.Builder eventBuilder = NetworkEvent.builder(type, seqNo);
if (optionalUncompressedSize != -1) {
eventBuilder.setUncompressedMessageSize(optionalUncompressedSize);
}
if (optionalWireSize != -1) {
eventBuilder.setCompressedMessageSize(optionalWireSize);
}
span.addNetworkEvent(eventBuilder.build());
}
@VisibleForTesting
final class ClientCallTracer extends ClientStreamTracer.Factory {
@ -201,7 +213,7 @@ final class CensusTracingModule {
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
headers.discardAll(tracingHeader);
headers.put(tracingHeader, span.getContext());
return noopClientTracer;
return new ClientTracer(span);
}
/**
@ -218,6 +230,28 @@ final class CensusTracingModule {
}
}
private static final class ClientTracer extends ClientStreamTracer {
private final Span span;
ClientTracer(Span span) {
this.span = checkNotNull(span, "span");
}
@Override
public void outboundMessageSent(
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
recordNetworkEvent(
span, NetworkEvent.Type.SENT, seqNo, optionalWireSize, optionalUncompressedSize);
}
@Override
public void inboundMessageRead(
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
recordNetworkEvent(
span, NetworkEvent.Type.RECV, seqNo, optionalWireSize, optionalUncompressedSize);
}
}
private final class ServerTracer extends ServerStreamTracer {
private final Span span;
private final AtomicBoolean streamClosed = new AtomicBoolean(false);
@ -252,6 +286,20 @@ final class CensusTracingModule {
// inherit from the parent Context.
return context.withValue(CONTEXT_SPAN_KEY, span);
}
@Override
public void outboundMessageSent(
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
recordNetworkEvent(
span, NetworkEvent.Type.SENT, seqNo, optionalWireSize, optionalUncompressedSize);
}
@Override
public void inboundMessageRead(
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
recordNetworkEvent(
span, NetworkEvent.Type.RECV, seqNo, optionalWireSize, optionalUncompressedSize);
}
}
@VisibleForTesting

View File

@ -94,6 +94,7 @@ public class MessageDeframer implements Closeable, Deframer {
private CompositeReadableBuffer unprocessed = new CompositeReadableBuffer();
private long pendingDeliveries;
private boolean inDelivery = false;
private int currentMessageSeqNo = -1;
private boolean closeWhenComplete = false;
private volatile boolean stopDelivery = false;
@ -317,7 +318,8 @@ public class MessageDeframer implements Closeable, Deframer {
.asRuntimeException();
}
statsTraceCtx.inboundMessage();
currentMessageSeqNo++;
statsTraceCtx.inboundMessage(currentMessageSeqNo);
// Continue reading the frame body.
state = State.BODY;
}
@ -326,6 +328,10 @@ public class MessageDeframer implements Closeable, Deframer {
* Processes the GRPC message body, which depending on frame header flags may be compressed.
*/
private void processBody() {
// There is no reliable way to get the uncompressed size per message when it's compressed,
// because the uncompressed bytes are provided through an InputStream whose total size is
// unknown until all bytes are read, and we don't know when it happens.
statsTraceCtx.inboundMessageRead(currentMessageSeqNo, requiredLength, -1);
InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody();
nextFrame = null;
listener.messagesAvailable(new SingleMessageProducer(stream));

View File

@ -75,6 +75,10 @@ public class MessageFramer implements Framer {
private final StatsTraceContext statsTraceCtx;
private boolean closed;
// Tracing and stats-related states
private int currentMessageSeqNo = -1;
private long currentMessageWireSize;
/**
* Creates a {@code MessageFramer}.
*
@ -114,7 +118,9 @@ public class MessageFramer implements Framer {
@Override
public void writePayload(InputStream message) {
verifyNotClosed();
statsTraceCtx.outboundMessage();
currentMessageSeqNo++;
currentMessageWireSize = 0;
statsTraceCtx.outboundMessage(currentMessageSeqNo);
boolean compressed = messageCompression && compressor != Codec.Identity.NONE;
int written = -1;
int messageLength = -2;
@ -143,11 +149,13 @@ public class MessageFramer implements Framer {
throw Status.INTERNAL.withDescription(err).asRuntimeException();
}
statsTraceCtx.outboundUncompressedSize(written);
statsTraceCtx.outboundWireSize(currentMessageWireSize);
statsTraceCtx.outboundMessageSent(currentMessageSeqNo, currentMessageWireSize, written);
}
private int writeUncompressed(InputStream message, int messageLength) throws IOException {
if (messageLength != -1) {
statsTraceCtx.outboundWireSize(messageLength);
currentMessageWireSize = messageLength;
return writeKnownLengthUncompressed(message, messageLength);
}
BufferChainOutputStream bufferChain = new BufferChainOutputStream();
@ -240,7 +248,7 @@ public class MessageFramer implements Framer {
// Assign the current buffer to the last in the chain so it can be used
// for future writes or written with end-of-stream=true on close.
buffer = bufferList.get(bufferList.size() - 1);
statsTraceCtx.outboundWireSize(messageLength);
currentMessageWireSize = messageLength;
}
private static int writeToOutputStream(InputStream message, OutputStream outputStream)

View File

@ -150,27 +150,53 @@ public final class StatsTraceContext {
}
/**
* See {@link StreamTracer#outboundMessage}.
* See {@link StreamTracer#outboundMessage(int)}.
*
* <p>Called from {@link io.grpc.internal.Framer}.
*/
public void outboundMessage() {
@SuppressWarnings("deprecation")
public void outboundMessage(int seqNo) {
for (StreamTracer tracer : tracers) {
tracer.outboundMessage(seqNo);
tracer.outboundMessage();
}
}
/**
* See {@link StreamTracer#inboundMessage}.
* See {@link StreamTracer#inboundMessage(int)}.
*
* <p>Called from {@link io.grpc.internal.MessageDeframer}.
*/
public void inboundMessage() {
@SuppressWarnings("deprecation")
public void inboundMessage(int seqNo) {
for (StreamTracer tracer : tracers) {
tracer.inboundMessage(seqNo);
tracer.inboundMessage();
}
}
/**
* See {@link StreamTracer#outboundMessageSent}.
*
* <p>Called from {@link io.grpc.internal.Framer}.
*/
public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
for (StreamTracer tracer : tracers) {
tracer.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize);
}
}
/**
* See {@link StreamTracer#inboundMessageRead}.
*
* <p>Called from {@link io.grpc.internal.MessageDeframer}.
*/
public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
for (StreamTracer tracer : tracers) {
tracer.inboundMessageRead(seqNo, optionalWireSize, optionalUncompressedSize);
}
}
/**
* See {@link StreamTracer#outboundUncompressedSize}.
*

View File

@ -16,8 +16,10 @@
package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@ -239,7 +241,11 @@ public class AbstractClientStreamTest {
// GET requests don't have BODY.
verify(sink, never())
.writeFrame(any(WritableBuffer.class), any(Boolean.class), any(Boolean.class));
assertEquals(1, tracer.getOutboundMessageCount());
assertThat(tracer.nextOutboundEvent()).isEqualTo("outboundMessage(0)");
assertThat(tracer.nextOutboundEvent()).isEqualTo("outboundMessage()");
assertThat(tracer.nextOutboundEvent()).matches("outboundMessageSent\\(0, [0-9]+, [0-9]+\\)");
assertNull(tracer.nextOutboundEvent());
assertNull(tracer.nextInboundEvent());
assertEquals(1, tracer.getOutboundWireSize());
assertEquals(1, tracer.getOutboundUncompressedSize());
}

View File

@ -31,8 +31,10 @@ import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
@ -63,6 +65,7 @@ import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.EndSpanOptions;
import io.opencensus.trace.Link;
import io.opencensus.trace.NetworkEvent;
import io.opencensus.trace.NetworkEvent.Type;
import io.opencensus.trace.Sampler;
import io.opencensus.trace.Span;
import io.opencensus.trace.SpanBuilder;
@ -90,6 +93,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@ -166,6 +170,8 @@ public class CensusModulesTest {
private ArgumentCaptor<ClientCall.Listener<String>> clientCallListenerCaptor;
@Captor
private ArgumentCaptor<Status> statusCaptor;
@Captor
private ArgumentCaptor<NetworkEvent> networkEventCaptor;
private CensusStatsModule censusStats;
private CensusTracingModule censusTracing;
@ -322,20 +328,20 @@ public class CensusModulesTest {
tracer.outboundHeaders();
fakeClock.forwardTime(100, MILLISECONDS);
tracer.outboundMessage();
tracer.outboundMessage(0);
tracer.outboundWireSize(1028);
tracer.outboundUncompressedSize(1128);
fakeClock.forwardTime(16, MILLISECONDS);
tracer.inboundMessage();
tracer.inboundMessage(0);
tracer.inboundWireSize(33);
tracer.inboundUncompressedSize(67);
tracer.outboundMessage();
tracer.outboundMessage(1);
tracer.outboundWireSize(99);
tracer.outboundUncompressedSize(865);
fakeClock.forwardTime(24, MILLISECONDS);
tracer.inboundMessage();
tracer.inboundMessage(1);
tracer.inboundWireSize(154);
tracer.inboundUncompressedSize(552);
tracer.streamClosed(Status.OK);
@ -372,11 +378,32 @@ public class CensusModulesTest {
eq("Sent.package1.service2.method3"), isNull(Span.class));
verify(spyClientSpan, never()).end(any(EndSpanOptions.class));
clientStreamTracer.outboundMessage(0);
clientStreamTracer.outboundMessageSent(0, 882, -1);
clientStreamTracer.inboundMessage(0);
clientStreamTracer.outboundMessage(1);
clientStreamTracer.outboundMessageSent(1, -1, 27);
clientStreamTracer.inboundMessageRead(0, 255, 90);
clientStreamTracer.streamClosed(Status.OK);
callTracer.callEnded(Status.OK);
verify(spyClientSpan).end(
InOrder inOrder = inOrder(spyClientSpan);
inOrder.verify(spyClientSpan, times(3)).addNetworkEvent(networkEventCaptor.capture());
List<NetworkEvent> events = networkEventCaptor.getAllValues();
assertEquals(
NetworkEvent.builder(Type.SENT, 0).setCompressedMessageSize(882).build(), events.get(0));
assertEquals(
NetworkEvent.builder(Type.SENT, 1).setUncompressedMessageSize(27).build(), events.get(1));
assertEquals(
NetworkEvent.builder(Type.RECV, 0)
.setCompressedMessageSize(255)
.setUncompressedMessageSize(90)
.build(),
events.get(2));
inOrder.verify(spyClientSpan).end(
EndSpanOptions.builder().setStatus(io.opencensus.trace.Status.OK).build());
verifyNoMoreInteractions(spyClientSpan);
verifyNoMoreInteractions(tracer);
}
@ -424,7 +451,7 @@ public class CensusModulesTest {
io.opencensus.trace.Status.DEADLINE_EXCEEDED
.withDescription("3 seconds"))
.build());
verify(spyClientSpan, never()).end();
verifyNoMoreInteractions(spyClientSpan);
}
@Test
@ -593,20 +620,20 @@ public class CensusModulesTest {
.with(RpcConstants.RPC_SERVER_METHOD, TagValue.create(method.getFullMethodName())),
statsCtx);
tracer.inboundMessage();
tracer.inboundMessage(0);
tracer.inboundWireSize(34);
tracer.inboundUncompressedSize(67);
fakeClock.forwardTime(100, MILLISECONDS);
tracer.outboundMessage();
tracer.outboundMessage(0);
tracer.outboundWireSize(1028);
tracer.outboundUncompressedSize(1128);
fakeClock.forwardTime(16, MILLISECONDS);
tracer.inboundMessage();
tracer.inboundMessage(1);
tracer.inboundWireSize(154);
tracer.inboundUncompressedSize(552);
tracer.outboundMessage();
tracer.outboundMessage(1);
tracer.outboundWireSize(99);
tracer.outboundUncompressedSize(865);
@ -648,12 +675,33 @@ public class CensusModulesTest {
assertSame(spyServerSpan, ContextUtils.CONTEXT_SPAN_KEY.get(filteredContext));
verify(spyServerSpan, never()).end(any(EndSpanOptions.class));
serverStreamTracer.outboundMessage(0);
serverStreamTracer.outboundMessageSent(0, 882, -1);
serverStreamTracer.inboundMessage(0);
serverStreamTracer.outboundMessage(1);
serverStreamTracer.outboundMessageSent(1, -1, 27);
serverStreamTracer.inboundMessageRead(0, 255, 90);
serverStreamTracer.streamClosed(Status.CANCELLED);
verify(spyServerSpan).end(
InOrder inOrder = inOrder(spyServerSpan);
inOrder.verify(spyServerSpan, times(3)).addNetworkEvent(networkEventCaptor.capture());
List<NetworkEvent> events = networkEventCaptor.getAllValues();
assertEquals(
NetworkEvent.builder(Type.SENT, 0).setCompressedMessageSize(882).build(), events.get(0));
assertEquals(
NetworkEvent.builder(Type.SENT, 1).setUncompressedMessageSize(27).build(), events.get(1));
assertEquals(
NetworkEvent.builder(Type.RECV, 0)
.setCompressedMessageSize(255)
.setUncompressedMessageSize(90)
.build(),
events.get(2));
inOrder.verify(spyServerSpan).end(
EndSpanOptions.builder()
.setStatus(io.opencensus.trace.Status.CANCELLED).build());
verify(spyServerSpan, never()).end();
verifyNoMoreInteractions(spyServerSpan);
}
@Test
@ -713,6 +761,7 @@ public class CensusModulesTest {
}
@Override
@SuppressWarnings("deprecation")
public void addAttributes(Map<String, AttributeValue> attributes) {}
@Override

View File

@ -18,6 +18,7 @@ package io.grpc.internal;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.atLeastOnce;
@ -77,7 +78,7 @@ public class MessageDeframerTest {
assertEquals(Bytes.asList(new byte[] {3, 14}), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
checkStats(1, 2, 2);
checkStats(2, 2);
}
@Test
@ -91,7 +92,7 @@ public class MessageDeframerTest {
verify(listener, atLeastOnce()).bytesRead(anyInt());
assertEquals(Bytes.asList(new byte[] {14, 15}), bytes(streams.get(1).next()));
verifyNoMoreInteractions(listener);
checkStats(2, 3, 3);
checkStats(1, 1, 2, 2);
}
@Test
@ -104,7 +105,7 @@ public class MessageDeframerTest {
verify(listener).deframerClosed(false);
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
checkStats(1, 1, 1);
checkStats(1, 1);
}
@Test
@ -113,7 +114,7 @@ public class MessageDeframerTest {
deframer.closeWhenComplete();
verify(listener).deframerClosed(false);
verifyNoMoreInteractions(listener);
checkStats(0, 0, 0);
checkStats();
}
@Test
@ -124,7 +125,7 @@ public class MessageDeframerTest {
verify(listener, atLeastOnce()).bytesRead(anyInt());
verify(listener).deframerClosed(true);
verifyNoMoreInteractions(listener);
checkStats(0, 0, 0);
checkStats();
}
@Test
@ -139,7 +140,7 @@ public class MessageDeframerTest {
Bytes.asList(new byte[] {3, 14, 1, 5, 9, 2, 6}), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
checkStats(1, 7, 7);
checkStats(7, 7);
}
@Test
@ -154,7 +155,7 @@ public class MessageDeframerTest {
assertEquals(Bytes.asList(new byte[] {3}), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
checkStats(1, 1, 1);
checkStats(1, 1);
}
@Test
@ -165,7 +166,7 @@ public class MessageDeframerTest {
assertEquals(Bytes.asList(), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
checkStats(1, 0, 0);
checkStats(0, 0);
}
@Test
@ -177,7 +178,7 @@ public class MessageDeframerTest {
assertEquals(Bytes.asList(new byte[1000]), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
checkStats(1, 1000, 1000);
checkStats(1000, 1000);
}
@Test
@ -192,7 +193,7 @@ public class MessageDeframerTest {
verify(listener).deframerClosed(false);
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
checkStats(1, 1, 1);
checkStats(1, 1);
}
@Test
@ -209,7 +210,7 @@ public class MessageDeframerTest {
assertEquals(Bytes.asList(new byte[1000]), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
checkStats(1, payload.length, 1000);
checkStats(payload.length, 1000);
}
@Test
@ -245,8 +246,7 @@ public class MessageDeframerTest {
while (stream.read() != -1) {}
stream.close();
// SizeEnforcingInputStream only reports uncompressed bytes
checkStats(0, 0, 3);
checkSizeEnforcingInputStreamStats(3);
}
@Test
@ -258,8 +258,7 @@ public class MessageDeframerTest {
while (stream.read() != -1) {}
stream.close();
// SizeEnforcingInputStream only reports uncompressed bytes
checkStats(0, 0, 3);
checkSizeEnforcingInputStreamStats(3);
}
@Test
@ -289,8 +288,7 @@ public class MessageDeframerTest {
assertEquals(3, read);
stream.close();
// SizeEnforcingInputStream only reports uncompressed bytes
checkStats(0, 0, 3);
checkSizeEnforcingInputStreamStats(3);
}
@Test
@ -304,8 +302,7 @@ public class MessageDeframerTest {
assertEquals(3, read);
stream.close();
// SizeEnforcingInputStream only reports uncompressed bytes
checkStats(0, 0, 3);
checkSizeEnforcingInputStreamStats(3);
}
@Test
@ -336,8 +333,7 @@ public class MessageDeframerTest {
assertEquals(3, skipped);
stream.close();
// SizeEnforcingInputStream only reports uncompressed bytes
checkStats(0, 0, 3);
checkSizeEnforcingInputStreamStats(3);
}
@Test
@ -350,8 +346,7 @@ public class MessageDeframerTest {
assertEquals(3, skipped);
stream.close();
// SizeEnforcingInputStream only reports uncompressed bytes
checkStats(0, 0, 3);
checkSizeEnforcingInputStreamStats(3);
}
@Test
@ -384,15 +379,38 @@ public class MessageDeframerTest {
assertEquals(2, skipped);
stream.close();
// SizeEnforcingInputStream only reports uncompressed bytes
checkStats(0, 0, 3);
checkSizeEnforcingInputStreamStats(3);
}
private void checkStats(
int messagesReceived, long wireBytesReceived, long uncompressedBytesReceived) {
assertEquals(messagesReceived, tracer.getInboundMessageCount());
assertEquals(wireBytesReceived, tracer.getInboundWireSize());
assertEquals(uncompressedBytesReceived, tracer.getInboundUncompressedSize());
/**
* @param sizes in the format {wire0, uncompressed0, wire1, uncompressed1, ...}
*/
private void checkStats(long... sizes) {
assertEquals(0, sizes.length % 2);
int count = sizes.length / 2;
long expectedWireSize = 0;
long expectedUncompressedSize = 0;
for (int i = 0; i < count; i++) {
assertEquals("inboundMessage(" + i + ")", tracer.nextInboundEvent());
assertEquals("inboundMessage()", tracer.nextInboundEvent());
assertEquals(
String.format("inboundMessageRead(%d, %d, -1)", i, sizes[i * 2]),
tracer.nextInboundEvent());
expectedWireSize += sizes[i * 2];
expectedUncompressedSize += sizes[i * 2 + 1];
}
assertNull(tracer.nextInboundEvent());
assertNull(tracer.nextOutboundEvent());
assertEquals(expectedWireSize, tracer.getInboundWireSize());
assertEquals(expectedUncompressedSize, tracer.getInboundUncompressedSize());
}
private void checkSizeEnforcingInputStreamStats(long uncompressedSize) {
assertNull(tracer.nextInboundEvent());
assertNull(tracer.nextOutboundEvent());
assertEquals(0, tracer.getInboundWireSize());
// SizeEnforcingInputStream only reports uncompressed bytes
assertEquals(uncompressedSize, tracer.getInboundUncompressedSize());
}
private static List<Byte> bytes(InputStream in) {

View File

@ -17,6 +17,7 @@
package io.grpc.internal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq;
@ -82,7 +83,7 @@ public class MessageFramerTest {
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true);
assertEquals(1, allocator.allocCount);
verifyNoMoreInteractions(sink);
checkStats(1, 2, 2);
checkStats(2, 2);
}
@Test
@ -94,7 +95,7 @@ public class MessageFramerTest {
verify(sink).deliverFrame(toWriteBuffer(new byte[] {3, 14}), false, true);
assertEquals(2, allocator.allocCount);
verifyNoMoreInteractions(sink);
checkStats(1, 2, 2);
checkStats(2, 2);
}
@Test
@ -108,7 +109,7 @@ public class MessageFramerTest {
toWriteBuffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false, true);
verifyNoMoreInteractions(sink);
assertEquals(1, allocator.allocCount);
checkStats(2, 2, 2);
checkStats(1, 1, 1, 1);
}
@Test
@ -120,7 +121,7 @@ public class MessageFramerTest {
toWriteBuffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true, true);
verifyNoMoreInteractions(sink);
assertEquals(1, allocator.allocCount);
checkStats(1, 7, 7);
checkStats(7, 7);
}
@Test
@ -129,7 +130,7 @@ public class MessageFramerTest {
verify(sink).deliverFrame(null, true, true);
verifyNoMoreInteractions(sink);
assertEquals(0, allocator.allocCount);
checkStats(0, 0, 0);
checkStats();
}
@Test
@ -145,7 +146,7 @@ public class MessageFramerTest {
verify(sink).deliverFrame(toWriteBuffer(new byte[] {5}), false, true);
verifyNoMoreInteractions(sink);
assertEquals(2, allocator.allocCount);
checkStats(1, 8, 8);
checkStats(8, 8);
}
@Test
@ -162,7 +163,7 @@ public class MessageFramerTest {
verify(sink).deliverFrame(toWriteBufferWithMinSize(new byte[] {1, 3}, 12), false, true);
verifyNoMoreInteractions(sink);
assertEquals(2, allocator.allocCount);
checkStats(2, 4, 4);
checkStats(3, 3, 1, 1);
}
@Test
@ -171,7 +172,7 @@ public class MessageFramerTest {
framer.flush();
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true);
assertEquals(1, allocator.allocCount);
checkStats(1, 0, 0);
checkStats(0, 0);
}
@Test
@ -182,7 +183,7 @@ public class MessageFramerTest {
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true);
// One alloc for the header
assertEquals(1, allocator.allocCount);
checkStats(1, 0, 0);
checkStats(0, 0);
}
@Test
@ -193,7 +194,7 @@ public class MessageFramerTest {
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true);
verifyNoMoreInteractions(sink);
assertEquals(1, allocator.allocCount);
checkStats(1, 2, 2);
checkStats(2, 2);
}
@Test
@ -213,7 +214,7 @@ public class MessageFramerTest {
assertEquals(toWriteBuffer(data), buffer);
verifyNoMoreInteractions(sink);
assertEquals(1, allocator.allocCount);
checkStats(1, 1000, 1000);
checkStats(1000, 1000);
}
@Test
@ -240,7 +241,7 @@ public class MessageFramerTest {
verifyNoMoreInteractions(sink);
assertEquals(3, allocator.allocCount);
checkStats(1, 1000, 1000);
checkStats(1000, 1000);
}
@Test
@ -265,7 +266,7 @@ public class MessageFramerTest {
assertTrue(length < 1000);
assertEquals(frameCaptor.getAllValues().get(1).size(), length);
checkStats(1, length, 1000);
checkStats(length, 1000);
}
@Test
@ -290,7 +291,7 @@ public class MessageFramerTest {
assertEquals(1000, length);
assertEquals(buffer.data.length - 5 , length);
checkStats(1, 1000, 1000);
checkStats(1000, 1000);
}
@Test
@ -316,7 +317,7 @@ public class MessageFramerTest {
assertEquals(1000, length);
assertEquals(buffer.data.length - 5 , length);
checkStats(1, 1000, 1000);
checkStats(1000, 1000);
}
@Test
@ -345,7 +346,7 @@ public class MessageFramerTest {
writeKnownLength(framer, new byte[]{});
framer.flush();
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true);
checkStats(1, 0, 0);
checkStats(0, 0);
}
private static WritableBuffer toWriteBuffer(byte[] data) {
@ -367,13 +368,27 @@ public class MessageFramerTest {
// TODO(carl-mastrangelo): add framer.flush() here.
}
private void checkStats(int messagesSent, long wireBytesSent, long uncompressedBytesSent) {
long actualWireSize = 0;
long actualUncompressedSize = 0;
assertEquals(messagesSent, tracer.getOutboundMessageCount());
assertEquals(uncompressedBytesSent, tracer.getOutboundUncompressedSize());
assertEquals(wireBytesSent, tracer.getOutboundWireSize());
/**
* @param sizes in the format {wire0, uncompressed0, wire1, uncompressed1, ...}
*/
private void checkStats(long... sizes) {
assertEquals(0, sizes.length % 2);
int count = sizes.length / 2;
long expectedWireSize = 0;
long expectedUncompressedSize = 0;
for (int i = 0; i < count; i++) {
assertEquals("outboundMessage(" + i + ")", tracer.nextOutboundEvent());
assertEquals("outboundMessage()", tracer.nextOutboundEvent());
assertEquals(
String.format("outboundMessageSent(%d, %d, %d)", i, sizes[i * 2], sizes[i * 2 + 1]),
tracer.nextOutboundEvent());
expectedWireSize += sizes[i * 2];
expectedUncompressedSize += sizes[i * 2 + 1];
}
assertNull(tracer.nextOutboundEvent());
assertNull(tracer.nextInboundEvent());
assertEquals(expectedWireSize, tracer.getOutboundWireSize());
assertEquals(expectedUncompressedSize, tracer.getOutboundUncompressedSize());
}
static class ByteWritableBuffer implements WritableBuffer {

View File

@ -117,7 +117,7 @@ final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory {
}
@Override
public void inboundMessage() {
public void inboundMessage(int seqNo) {
anythingReceived.set(true);
}

View File

@ -1789,18 +1789,29 @@ public abstract class AbstractInteropTest {
TestStreamTracer tracer,
Collection<? extends MessageLite> sentMessages,
Collection<? extends MessageLite> receivedMessages) {
assertEquals(sentMessages.size(), tracer.getOutboundMessageCount());
assertEquals(receivedMessages.size(), tracer.getInboundMessageCount());
long uncompressedSentSize = 0;
int seqNo = 0;
for (MessageLite msg : sentMessages) {
assertThat(tracer.nextOutboundEvent()).isEqualTo(String.format("outboundMessage(%d)", seqNo));
assertThat(tracer.nextOutboundEvent()).isEqualTo("outboundMessage()");
assertThat(tracer.nextOutboundEvent()).matches(
String.format(
"outboundMessageSent\\(%d, -?[0-9]+, %d\\)", seqNo, msg.getSerializedSize()));
seqNo++;
uncompressedSentSize += msg.getSerializedSize();
}
assertNull(tracer.nextOutboundEvent());
long uncompressedReceivedSize = 0;
seqNo = 0;
for (MessageLite msg : receivedMessages) {
assertThat(tracer.nextInboundEvent()).isEqualTo(String.format("inboundMessage(%d)", seqNo));
assertThat(tracer.nextInboundEvent()).isEqualTo("inboundMessage()");
assertThat(tracer.nextInboundEvent()).matches(
String.format("inboundMessageRead\\(%d, -?[0-9]+, -?[0-9]+\\)", seqNo));
uncompressedReceivedSize += msg.getSerializedSize();
seqNo++;
}
assertNull(tracer.nextInboundEvent());
assertEquals(uncompressedSentSize, tracer.getOutboundUncompressedSize());
assertEquals(uncompressedReceivedSize, tracer.getInboundUncompressedSize());
}

View File

@ -732,7 +732,8 @@ public abstract class AbstractTransportTest {
assertTrue(clientStream.isReady());
clientStream.writeMessage(methodDescriptor.streamRequest("Hello!"));
if (metricsExpected()) {
assertThat(clientStreamTracer1.getOutboundMessageCount()).isGreaterThan(0);
assertThat(clientStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage(0)");
assertThat(clientStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage()");
}
clientStream.flush();
@ -740,10 +741,12 @@ public abstract class AbstractTransportTest {
assertEquals("Hello!", methodDescriptor.parseRequest(message));
message.close();
if (metricsExpected()) {
assertThat(clientStreamTracer1.getOutboundMessageCount()).isGreaterThan(0);
assertThat(clientStreamTracer1.nextOutboundEvent())
.matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)");
assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
assertEquals(1, serverStreamTracer1.getInboundMessageCount());
assertThat(serverStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)");
assertThat(serverStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage()");
}
assertNull("no additional message expected", serverStreamMessageQueue.poll());
@ -753,6 +756,8 @@ public abstract class AbstractTransportTest {
if (metricsExpected()) {
assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.nextInboundEvent())
.matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");
}
Metadata serverHeaders = new Metadata();
@ -774,7 +779,8 @@ public abstract class AbstractTransportTest {
assertTrue(serverStream.isReady());
serverStream.writeMessage(methodDescriptor.streamResponse("Hi. Who are you?"));
if (metricsExpected()) {
assertEquals(1, serverStreamTracer1.getOutboundMessageCount());
assertThat(serverStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage(0)");
assertThat(serverStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage()");
}
serverStream.flush();
@ -782,13 +788,18 @@ public abstract class AbstractTransportTest {
.messagesAvailable(any(StreamListener.MessageProducer.class));
message = clientStreamMessageQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (metricsExpected()) {
assertThat(serverStreamTracer1.nextOutboundEvent())
.matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)");
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
assertTrue(clientStreamTracer1.getInboundHeaders());
assertThat(clientStreamTracer1.getInboundMessageCount()).isGreaterThan(0);
assertThat(clientStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)");
assertThat(clientStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage()");
}
assertEquals("Hi. Who are you?", methodDescriptor.parseResponse(message));
if (metricsExpected()) {
assertThat(clientStreamTracer1.nextInboundEvent())
.matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
}
@ -804,6 +815,8 @@ public abstract class AbstractTransportTest {
serverStream.close(status, trailers);
if (metricsExpected()) {
assertSame(status, serverStreamTracer1.getStatus());
assertNull(serverStreamTracer1.nextInboundEvent());
assertNull(serverStreamTracer1.nextOutboundEvent());
}
verify(mockServerStreamListener, timeout(TIMEOUT_MS)).closed(statusCaptor.capture());
assertCodeEquals(Status.OK, statusCaptor.getValue());
@ -811,6 +824,8 @@ public abstract class AbstractTransportTest {
.closed(statusCaptor.capture(), metadataCaptor.capture());
if (metricsExpected()) {
assertSame(statusCaptor.getValue(), clientStreamTracer1.getStatus());
assertNull(clientStreamTracer1.nextInboundEvent());
assertNull(clientStreamTracer1.nextOutboundEvent());
}
assertEquals(status.getCode(), statusCaptor.getValue().getCode());
assertEquals(status.getDescription(), statusCaptor.getValue().getDescription());
@ -1118,11 +1133,9 @@ public abstract class AbstractTransportTest {
if (metricsExpected()) {
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertTrue(clientStreamTracer1.getInboundHeaders());
assertEquals(1, clientStreamTracer1.getInboundMessageCount());
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
assertSame(status, clientStreamTracer1.getStatus());
assertEquals(1, serverStreamTracer1.getOutboundMessageCount());
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
// There is a race between client cancelling and server closing. The final status seen by the

View File

@ -53,11 +53,6 @@ public class TestClientStreamTracer extends ClientStreamTracer implements TestSt
return outboundHeadersCalled.get();
}
@Override
public int getInboundMessageCount() {
return delegate.getInboundMessageCount();
}
@Override
public Status getStatus() {
return delegate.getStatus();
@ -73,11 +68,6 @@ public class TestClientStreamTracer extends ClientStreamTracer implements TestSt
return delegate.getInboundUncompressedSize();
}
@Override
public int getOutboundMessageCount() {
return delegate.getOutboundMessageCount();
}
@Override
public long getOutboundWireSize() {
return delegate.getOutboundWireSize();
@ -88,6 +78,16 @@ public class TestClientStreamTracer extends ClientStreamTracer implements TestSt
return delegate.getOutboundUncompressedSize();
}
@Override
public String nextOutboundEvent() {
return delegate.nextOutboundEvent();
}
@Override
public String nextInboundEvent() {
return delegate.nextInboundEvent();
}
@Override
public void outboundWireSize(long bytes) {
delegate.outboundWireSize(bytes);
@ -114,15 +114,37 @@ public class TestClientStreamTracer extends ClientStreamTracer implements TestSt
}
@Override
@SuppressWarnings("deprecation")
public void inboundMessage() {
delegate.inboundMessage();
}
@Override
public void inboundMessage(int seqNo) {
delegate.inboundMessage(seqNo);
}
@Override
@SuppressWarnings("deprecation")
public void outboundMessage() {
delegate.outboundMessage();
}
@Override
public void outboundMessage(int seqNo) {
delegate.outboundMessage(seqNo);
}
@Override
public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
delegate.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize);
}
@Override
public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
delegate.inboundMessageRead(seqNo, optionalWireSize, optionalUncompressedSize);
}
@Override
public void outboundHeaders() {
if (!outboundHeadersCalled.compareAndSet(false, true)

View File

@ -47,11 +47,6 @@ public class TestServerStreamTracer extends ServerStreamTracer implements TestSt
return serverCall.get();
}
@Override
public int getInboundMessageCount() {
return delegate.getInboundMessageCount();
}
@Override
public Status getStatus() {
return delegate.getStatus();
@ -67,11 +62,6 @@ public class TestServerStreamTracer extends ServerStreamTracer implements TestSt
return delegate.getInboundUncompressedSize();
}
@Override
public int getOutboundMessageCount() {
return delegate.getOutboundMessageCount();
}
@Override
public long getOutboundWireSize() {
return delegate.getOutboundWireSize();
@ -82,6 +72,16 @@ public class TestServerStreamTracer extends ServerStreamTracer implements TestSt
return delegate.getOutboundUncompressedSize();
}
@Override
public String nextOutboundEvent() {
return delegate.nextOutboundEvent();
}
@Override
public String nextInboundEvent() {
return delegate.nextInboundEvent();
}
@Override
public void outboundWireSize(long bytes) {
delegate.outboundWireSize(bytes);
@ -108,15 +108,37 @@ public class TestServerStreamTracer extends ServerStreamTracer implements TestSt
}
@Override
@SuppressWarnings("deprecation")
public void inboundMessage() {
delegate.inboundMessage();
}
@Override
public void inboundMessage(int seqNo) {
delegate.inboundMessage(seqNo);
}
@Override
@SuppressWarnings("deprecation")
public void outboundMessage() {
delegate.outboundMessage();
}
@Override
public void outboundMessage(int seqNo) {
delegate.outboundMessage(seqNo);
}
@Override
public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
delegate.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize);
}
@Override
public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
delegate.inboundMessageRead(seqNo, optionalWireSize, optionalUncompressedSize);
}
@Override
public void serverCallStarted(ServerCall<?, ?> call) {
if (!serverCall.compareAndSet(null, call) && delegate.failDuplicateCallbacks.get()) {

View File

@ -19,11 +19,12 @@ package io.grpc.internal.testing;
import io.grpc.Status;
import io.grpc.StreamTracer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
/**
* A {@link StreamTracer} suitable for testing.
@ -40,16 +41,6 @@ public interface TestStreamTracer {
*/
boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException;
/**
* Returns how many times {@link StreamTracer#inboundMessage} has been called.
*/
int getInboundMessageCount();
/**
* Returns how many times {@link StreamTracer#outboundMessage} has been called.
*/
int getOutboundMessageCount();
/**
* Returns the status passed to {@link StreamTracer#streamClosed}.
*/
@ -75,6 +66,17 @@ public interface TestStreamTracer {
*/
long getOutboundUncompressedSize();
/**
* Returns the next captured outbound message event.
*/
@Nullable
String nextOutboundEvent();
/**
* Returns the next captured outbound message event.
*/
String nextInboundEvent();
/**
* A {@link StreamTracer} suitable for testing.
*/
@ -84,8 +86,8 @@ public interface TestStreamTracer {
protected final AtomicLong inboundWireSize = new AtomicLong();
protected final AtomicLong outboundUncompressedSize = new AtomicLong();
protected final AtomicLong inboundUncompressedSize = new AtomicLong();
protected final AtomicInteger inboundMessageCount = new AtomicInteger();
protected final AtomicInteger outboundMessageCount = new AtomicInteger();
protected final LinkedBlockingQueue<String> outboundEvents = new LinkedBlockingQueue<String>();
protected final LinkedBlockingQueue<String> inboundEvents = new LinkedBlockingQueue<String>();
protected final AtomicReference<Status> streamClosedStatus = new AtomicReference<Status>();
protected final CountDownLatch streamClosed = new CountDownLatch(1);
protected final AtomicBoolean failDuplicateCallbacks = new AtomicBoolean(true);
@ -100,16 +102,6 @@ public interface TestStreamTracer {
return streamClosed.await(timeout, timeUnit);
}
@Override
public int getInboundMessageCount() {
return inboundMessageCount.get();
}
@Override
public int getOutboundMessageCount() {
return outboundMessageCount.get();
}
@Override
public Status getStatus() {
return streamClosedStatus.get();
@ -167,13 +159,52 @@ public interface TestStreamTracer {
}
@Override
@SuppressWarnings("deprecation")
public void inboundMessage() {
inboundMessageCount.incrementAndGet();
inboundEvents.add("inboundMessage()");
}
@Override
public void inboundMessage(int seqNo) {
inboundEvents.add("inboundMessage(" + seqNo + ")");
}
@Override
@SuppressWarnings("deprecation")
public void outboundMessage() {
outboundMessageCount.incrementAndGet();
outboundEvents.add("outboundMessage()");
}
@Override
public void outboundMessage(int seqNo) {
outboundEvents.add("outboundMessage(" + seqNo + ")");
}
@Override
public void outboundMessageSent(
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
outboundEvents.add(
String.format(
"outboundMessageSent(%d, %d, %d)",
seqNo, optionalWireSize, optionalUncompressedSize));
}
@Override
public void inboundMessageRead(
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
inboundEvents.add(
String.format(
"inboundMessageRead(%d, %d, %d)", seqNo, optionalWireSize, optionalUncompressedSize));
}
@Override
public String nextOutboundEvent() {
return outboundEvents.poll();
}
@Override
public String nextInboundEvent() {
return inboundEvents.poll();
}
}
}