mirror of https://github.com/grpc/grpc-java.git
census: add delayed name resolution tracing annotation (#10044)
This commit is contained in:
parent
a702397903
commit
14ba959545
|
@ -27,6 +27,12 @@ import javax.annotation.concurrent.ThreadSafe;
|
|||
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2861")
|
||||
@ThreadSafe
|
||||
public abstract class ClientStreamTracer extends StreamTracer {
|
||||
/**
|
||||
* The call was delayed due to waiting for name resolution result.
|
||||
*/
|
||||
public static final CallOptions.Key<Boolean> NAME_RESOLUTION_DELAYED =
|
||||
CallOptions.Key.createWithDefault("io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED",
|
||||
false);
|
||||
|
||||
/**
|
||||
* The stream is being created on a ready transport.
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package io.grpc.census;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
|
||||
import static io.grpc.census.internal.ObservabilityCensusConstants.CLIENT_TRACE_SPAN_CONTEXT_KEY;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -269,6 +270,9 @@ final class CensusTracingModule {
|
|||
"previous-rpc-attempts", AttributeValue.longAttributeValue(info.getPreviousAttempts()));
|
||||
attemptSpan.putAttribute(
|
||||
"transparent-retry", AttributeValue.booleanAttributeValue(info.isTransparentRetry()));
|
||||
if (info.getCallOptions().getOption(NAME_RESOLUTION_DELAYED)) {
|
||||
span.addAnnotation("Delayed name resolution complete");
|
||||
}
|
||||
return new ClientTracer(attemptSpan, span, tracingHeader, isSampledToLocalTracing);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ package io.grpc.census;
|
|||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static com.google.common.truth.Truth.assertWithMessage;
|
||||
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
|
||||
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRIES_PER_CALL;
|
||||
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRY_DELAY_PER_CALL;
|
||||
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.TRANSPARENT_RETRIES_PER_CALL;
|
||||
|
@ -132,7 +133,8 @@ public class CensusModulesTest {
|
|||
private static final CallOptions CALL_OPTIONS =
|
||||
CallOptions.DEFAULT.withOption(CUSTOM_OPTION, "customvalue");
|
||||
private static final ClientStreamTracer.StreamInfo STREAM_INFO =
|
||||
ClientStreamTracer.StreamInfo.newBuilder().build();
|
||||
ClientStreamTracer.StreamInfo.newBuilder()
|
||||
.setCallOptions(CallOptions.DEFAULT.withOption(NAME_RESOLUTION_DELAYED, true)).build();
|
||||
|
||||
private static class StringInputStream extends InputStream {
|
||||
final String string;
|
||||
|
@ -768,6 +770,7 @@ public class CensusModulesTest {
|
|||
.putAttribute("previous-rpc-attempts", AttributeValue.longAttributeValue(0));
|
||||
inOrder.verify(spyAttemptSpan)
|
||||
.putAttribute("transparent-retry", AttributeValue.booleanAttributeValue(false));
|
||||
inOrder.verify(spyClientSpan).addAnnotation("Delayed name resolution complete");
|
||||
inOrder.verify(spyAttemptSpan).addAnnotation("Delayed LB pick complete");
|
||||
inOrder.verify(spyAttemptSpan, times(2)).addMessageEvent(messageEventCaptor.capture());
|
||||
List<MessageEvent> events = messageEventCaptor.getAllValues();
|
||||
|
|
|
@ -19,6 +19,7 @@ package io.grpc.internal;
|
|||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
|
||||
import static io.grpc.ConnectivityState.IDLE;
|
||||
import static io.grpc.ConnectivityState.SHUTDOWN;
|
||||
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
||||
|
@ -1085,7 +1086,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
ClientCall<ReqT, RespT> realCall;
|
||||
Context previous = context.attach();
|
||||
try {
|
||||
realCall = newClientCall(method, callOptions);
|
||||
CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED, true);
|
||||
realCall = newClientCall(method, delayResolutionOption);
|
||||
} finally {
|
||||
context.detach(previous);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package io.grpc.internal;
|
|||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static com.google.common.truth.Truth.assertWithMessage;
|
||||
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
|
||||
import static io.grpc.ConnectivityState.CONNECTING;
|
||||
import static io.grpc.ConnectivityState.IDLE;
|
||||
import static io.grpc.ConnectivityState.READY;
|
||||
|
@ -1071,6 +1072,50 @@ public class ManagedChannelImplTest {
|
|||
verifyPanicMode(ex);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void delayedNameResolution() {
|
||||
ClientStream mockStream = mock(ClientStream.class);
|
||||
final ClientStreamTracer tracer = new ClientStreamTracer() {};
|
||||
ClientStreamTracer.Factory factory = new ClientStreamTracer.Factory() {
|
||||
@Override
|
||||
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
|
||||
return tracer;
|
||||
}
|
||||
};
|
||||
FakeNameResolverFactory nsFactory = new FakeNameResolverFactory.Builder(expectedUri)
|
||||
.setResolvedAtStart(false).build();
|
||||
channelBuilder.nameResolverFactory(nsFactory);
|
||||
createChannel();
|
||||
|
||||
CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory);
|
||||
ClientCall<String, Integer> call = channel.newCall(method, callOptions);
|
||||
call.start(mockCallListener, new Metadata());
|
||||
|
||||
nsFactory.allResolved();
|
||||
Subchannel subchannel =
|
||||
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
|
||||
requestConnectionSafely(helper, subchannel);
|
||||
MockClientTransportInfo transportInfo = transports.poll();
|
||||
transportInfo.listener.transportReady();
|
||||
ClientTransport mockTransport = transportInfo.transport;
|
||||
when(mockTransport.newStream(
|
||||
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
|
||||
ArgumentMatchers.<ClientStreamTracer[]>any()))
|
||||
.thenReturn(mockStream);
|
||||
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
|
||||
PickResult.withSubchannel(subchannel));
|
||||
|
||||
updateBalancingStateSafely(helper, READY, mockPicker);
|
||||
assertEquals(2, executor.runDueTasks());
|
||||
|
||||
verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
|
||||
verify(mockTransport).newStream(
|
||||
same(method), any(Metadata.class), callOptionsCaptor.capture(),
|
||||
tracersCaptor.capture());
|
||||
assertThat(Arrays.asList(tracersCaptor.getValue()).contains(tracer)).isTrue();
|
||||
assertThat(callOptionsCaptor.getValue().getOption(NAME_RESOLUTION_DELAYED)).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void nameResolvedAfterChannelShutdown() {
|
||||
// Delay the success of name resolution until allResolved() is called.
|
||||
|
|
Loading…
Reference in New Issue