mirror of https://github.com/grpc/grpc-java.git
core, services: make BinaryLog an explicit object that is passed into channels/servers (#4431)
remove SPI, io.grpc.BinaryLog is a public API that is passed into builders and must be explicitly closed.
This commit is contained in:
parent
46079fff8a
commit
6f29b60dcf
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Copyright 2018, gRPC Authors All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A binary log that can be installed on a channel or server. {@link #close} must be called after
|
||||
* all the servers and channels associated with the binary log are terminated.
|
||||
*/
|
||||
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4017")
|
||||
public class BinaryLog implements Closeable {
|
||||
final BinaryLogProvider surrogate;
|
||||
|
||||
BinaryLog(BinaryLogProvider surrogate) {
|
||||
Preconditions.checkNotNull(surrogate);
|
||||
this.surrogate = surrogate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
surrogate.close();
|
||||
}
|
||||
}
|
|
@ -14,33 +14,24 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.internal;
|
||||
package io.grpc;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ClientCall;
|
||||
import io.grpc.ClientInterceptor;
|
||||
import io.grpc.ClientInterceptors;
|
||||
import io.grpc.InternalClientInterceptors;
|
||||
import io.grpc.InternalServerInterceptors;
|
||||
import io.grpc.InternalServiceProviders;
|
||||
import io.grpc.InternalServiceProviders.PriorityAccessor;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.grpc.MethodDescriptor.Marshaller;
|
||||
import io.grpc.ServerCallHandler;
|
||||
import io.grpc.ServerInterceptor;
|
||||
import io.grpc.ServerMethodDefinition;
|
||||
import io.opencensus.trace.Span;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collections;
|
||||
import java.util.logging.Logger;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
// TODO(zpencer): rename class to AbstractBinaryLog
|
||||
@Internal
|
||||
public abstract class BinaryLogProvider implements Closeable {
|
||||
public static final CallOptions.Key<CallId> CLIENT_CALL_ID_CALLOPTION_KEY
|
||||
= CallOptions.Key.of("binarylog-calloptions-key", null);
|
||||
|
@ -48,32 +39,9 @@ public abstract class BinaryLogProvider implements Closeable {
|
|||
public static final Marshaller<byte[]> BYTEARRAY_MARSHALLER = new ByteArrayMarshaller();
|
||||
|
||||
private static final Logger logger = Logger.getLogger(BinaryLogProvider.class.getName());
|
||||
private static final BinaryLogProvider PROVIDER = InternalServiceProviders.load(
|
||||
BinaryLogProvider.class,
|
||||
Collections.<Class<?>>emptyList(),
|
||||
BinaryLogProvider.class.getClassLoader(),
|
||||
new PriorityAccessor<BinaryLogProvider>() {
|
||||
@Override
|
||||
public boolean isAvailable(BinaryLogProvider provider) {
|
||||
return provider.isAvailable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPriority(BinaryLogProvider provider) {
|
||||
return provider.priority();
|
||||
}
|
||||
});
|
||||
|
||||
private final ClientInterceptor binaryLogShim = new BinaryLogShim();
|
||||
|
||||
/**
|
||||
* Returns a {@code BinaryLogProvider}, or {@code null} if there is no provider.
|
||||
*/
|
||||
@Nullable
|
||||
public static BinaryLogProvider provider() {
|
||||
return PROVIDER;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a channel to provide binary logging on {@link ClientCall}s as needed.
|
||||
*/
|
||||
|
@ -132,20 +100,6 @@ public abstract class BinaryLogProvider implements Closeable {
|
|||
// TODO(zpencer): make BinaryLogProvider provide a BinaryLog, and this method belongs there
|
||||
}
|
||||
|
||||
/**
|
||||
* A priority, from 0 to 10 that this provider should be used, taking the current environment into
|
||||
* consideration. 5 should be considered the default, and then tweaked based on environment
|
||||
* detection. A priority of 0 does not imply that the provider wouldn't work; just that it should
|
||||
* be last in line.
|
||||
*/
|
||||
protected abstract int priority();
|
||||
|
||||
/**
|
||||
* Whether this provider is available for use, taking the current environment into consideration.
|
||||
* If {@code false}, no other methods are safe to be called.
|
||||
*/
|
||||
protected abstract boolean isAvailable();
|
||||
|
||||
// Creating a named class makes debugging easier
|
||||
private static final class ByteArrayMarshaller implements Marshaller<byte[]> {
|
||||
@Override
|
||||
|
@ -172,7 +126,7 @@ public abstract class BinaryLogProvider implements Closeable {
|
|||
}
|
||||
|
||||
/**
|
||||
* The pipeline of interceptors is hard coded when the {@link ManagedChannelImpl} is created.
|
||||
* The pipeline of interceptors is hard coded when the {@link ManagedChannel} is created.
|
||||
* This shim interceptor should always be installed as a placeholder. When a call starts,
|
||||
* this interceptor checks with the {@link BinaryLogProvider} to see if logging should happen
|
||||
* for this particular {@link ClientCall}'s method.
|
||||
|
@ -218,4 +172,35 @@ public abstract class BinaryLogProvider implements Closeable {
|
|||
return new CallId(0, ByteBuffer.wrap(span.getContext().getSpanId().getBytes()).getLong());
|
||||
}
|
||||
}
|
||||
|
||||
// Copied from internal
|
||||
private static final class IoUtils {
|
||||
/** maximum buffer to be read is 16 KB. */
|
||||
private static final int MAX_BUFFER_LENGTH = 16384;
|
||||
|
||||
/** Returns the byte array. */
|
||||
public static byte[] toByteArray(InputStream in) throws IOException {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
copy(in, out);
|
||||
return out.toByteArray();
|
||||
}
|
||||
|
||||
/** Copies the data from input stream to output stream. */
|
||||
public static long copy(InputStream from, OutputStream to) throws IOException {
|
||||
// Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
|
||||
Preconditions.checkNotNull(from);
|
||||
Preconditions.checkNotNull(to);
|
||||
byte[] buf = new byte[MAX_BUFFER_LENGTH];
|
||||
long total = 0;
|
||||
while (true) {
|
||||
int r = from.read(buf);
|
||||
if (r == -1) {
|
||||
break;
|
||||
}
|
||||
to.write(buf, 0, r);
|
||||
total += r;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Copyright 2018, gRPC Authors All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc;
|
||||
|
||||
@Internal
|
||||
public final class InternalBinaryLogs {
|
||||
public static <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition(
|
||||
BinaryLog binaryLog, ServerMethodDefinition<ReqT, RespT> oMethodDef) {
|
||||
return binaryLog.surrogate.wrapMethodDefinition(oMethodDef);
|
||||
}
|
||||
|
||||
public static Channel wrapChannel(BinaryLog binaryLog, Channel channel) {
|
||||
return binaryLog.surrogate.wrapChannel(channel);
|
||||
}
|
||||
|
||||
public static BinaryLog createBinaryLog(BinaryLogProvider surrogate) {
|
||||
return new BinaryLog(surrogate);
|
||||
}
|
||||
|
||||
private InternalBinaryLogs() {}
|
||||
}
|
|
@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.BinaryLog;
|
||||
import io.grpc.ClientInterceptor;
|
||||
import io.grpc.CompressorRegistry;
|
||||
import io.grpc.DecompressorRegistry;
|
||||
|
@ -140,7 +141,7 @@ public abstract class AbstractManagedChannelImplBuilder
|
|||
private int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
|
||||
|
||||
@Nullable
|
||||
BinaryLogProvider binlogProvider = BinaryLogProvider.provider();
|
||||
BinaryLog binlog;
|
||||
|
||||
/**
|
||||
* Sets the maximum message size allowed for a single gRPC frame. If an inbound messages
|
||||
|
|
|
@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import io.grpc.BinaryLog;
|
||||
import io.grpc.BindableService;
|
||||
import io.grpc.CompressorRegistry;
|
||||
import io.grpc.Context;
|
||||
|
@ -107,7 +108,8 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
|
|||
private boolean recordFinishedRpcs = true;
|
||||
private boolean tracingEnabled = true;
|
||||
|
||||
protected BinaryLogProvider binlogProvider = BinaryLogProvider.provider();
|
||||
@Nullable
|
||||
protected BinaryLog binlog;
|
||||
protected TransportTracer.Factory transportTracerFactory = TransportTracer.getDefaultFactory();
|
||||
|
||||
protected Channelz channelz = Channelz.instance();
|
||||
|
|
|
@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
|||
import static io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.grpc.BinaryLogProvider;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ClientCall;
|
||||
|
|
|
@ -31,6 +31,7 @@ import com.google.common.base.Supplier;
|
|||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.BinaryLogProvider;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ClientCall;
|
||||
|
@ -43,6 +44,7 @@ import io.grpc.ConnectivityStateInfo;
|
|||
import io.grpc.Context;
|
||||
import io.grpc.DecompressorRegistry;
|
||||
import io.grpc.EquivalentAddressGroup;
|
||||
import io.grpc.InternalBinaryLogs;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancer.PickResult;
|
||||
import io.grpc.LoadBalancer.PickSubchannelArgs;
|
||||
|
@ -537,8 +539,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
serviceConfigInterceptor = new ServiceConfigInterceptor(retryEnabled, builder.maxRetryAttempts);
|
||||
Channel channel = new RealChannel();
|
||||
channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor);
|
||||
if (builder.binlogProvider != null) {
|
||||
channel = builder.binlogProvider.wrapChannel(channel);
|
||||
if (builder.binlog != null) {
|
||||
channel = InternalBinaryLogs.wrapChannel(builder.binlog, channel);
|
||||
}
|
||||
this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
|
||||
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
|
||||
|
|
|
@ -30,11 +30,13 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.BinaryLog;
|
||||
import io.grpc.CompressorRegistry;
|
||||
import io.grpc.Context;
|
||||
import io.grpc.Decompressor;
|
||||
import io.grpc.DecompressorRegistry;
|
||||
import io.grpc.HandlerRegistry;
|
||||
import io.grpc.InternalBinaryLogs;
|
||||
import io.grpc.InternalServerInterceptors;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.ServerCall;
|
||||
|
@ -108,7 +110,7 @@ public final class ServerImpl extends io.grpc.Server implements Instrumented<Ser
|
|||
|
||||
private final DecompressorRegistry decompressorRegistry;
|
||||
private final CompressorRegistry compressorRegistry;
|
||||
private final BinaryLogProvider binlogProvider;
|
||||
private final BinaryLog binlog;
|
||||
|
||||
private final Channelz channelz;
|
||||
private final CallTracer serverCallTracer;
|
||||
|
@ -139,7 +141,7 @@ public final class ServerImpl extends io.grpc.Server implements Instrumented<Ser
|
|||
this.interceptors =
|
||||
builder.interceptors.toArray(new ServerInterceptor[builder.interceptors.size()]);
|
||||
this.handshakeTimeoutMillis = builder.handshakeTimeoutMillis;
|
||||
this.binlogProvider = builder.binlogProvider;
|
||||
this.binlog = builder.binlog;
|
||||
this.channelz = builder.channelz;
|
||||
this.serverCallTracer = builder.callTracerFactory.create();
|
||||
|
||||
|
@ -535,8 +537,8 @@ public final class ServerImpl extends io.grpc.Server implements Instrumented<Ser
|
|||
handler = InternalServerInterceptors.interceptCallHandler(interceptor, handler);
|
||||
}
|
||||
ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
|
||||
ServerMethodDefinition<?, ?> wMethodDef = binlogProvider == null
|
||||
? interceptedDef : binlogProvider.wrapMethodDefinition(interceptedDef);
|
||||
ServerMethodDefinition<?, ?> wMethodDef = binlog == null
|
||||
? interceptedDef : InternalBinaryLogs.wrapMethodDefinition(binlog, interceptedDef);
|
||||
return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context);
|
||||
}
|
||||
|
||||
|
|
|
@ -14,49 +14,27 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.internal;
|
||||
package io.grpc;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ClientCall;
|
||||
import io.grpc.ClientInterceptor;
|
||||
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
|
||||
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
|
||||
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
|
||||
import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener;
|
||||
import io.grpc.IntegerMarshaller;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.MethodDescriptor.Marshaller;
|
||||
import io.grpc.MethodDescriptor.MethodType;
|
||||
import io.grpc.ServerCall;
|
||||
import io.grpc.ServerCallHandler;
|
||||
import io.grpc.ServerInterceptor;
|
||||
import io.grpc.ServerMethodDefinition;
|
||||
import io.grpc.StringMarshaller;
|
||||
import io.grpc.internal.BinaryLogProvider.CallId;
|
||||
import io.grpc.internal.testing.StatsTestUtils.MockableSpan;
|
||||
import io.grpc.testing.TestMethodDescriptors;
|
||||
import io.opencensus.trace.Span;
|
||||
import io.opencensus.trace.SpanBuilder;
|
||||
import io.opencensus.trace.Tracer;
|
||||
import io.opencensus.trace.propagation.BinaryFormat;
|
||||
import io.grpc.internal.IoUtils;
|
||||
import io.grpc.internal.NoopClientCall;
|
||||
import io.grpc.internal.NoopServerCall;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
@ -102,20 +80,6 @@ public class BinaryLogProviderTest {
|
|||
String fullMethodName, CallOptions callOptions) {
|
||||
return new TestBinaryLogClientInterceptor();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() { }
|
||||
|
||||
|
||||
@Override
|
||||
protected int priority() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isAvailable() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
@Test
|
||||
|
@ -302,48 +266,6 @@ public class BinaryLogProviderTest {
|
|||
(int) method.parseResponse(new ByteArrayInputStream((byte[]) serializedResp.get(0))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void callIdFromSpan() {
|
||||
MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0));
|
||||
CallId callId = CallId.fromCensusSpan(mockableSpan);
|
||||
assertThat(callId.hi).isEqualTo(0);
|
||||
assertThat(callId.lo)
|
||||
.isEqualTo(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void censusTracerSetsCallId() throws Exception {
|
||||
Tracer tracer = mock(Tracer.class);
|
||||
SpanBuilder builder = mock(SpanBuilder.class);
|
||||
when(tracer.spanBuilderWithExplicitParent(any(String.class), any(Span.class)))
|
||||
.thenReturn(builder);
|
||||
when(builder.setRecordEvents(any(Boolean.class))).thenReturn(builder);
|
||||
MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0));
|
||||
when(builder.startSpan()).thenReturn(mockableSpan);
|
||||
|
||||
final SettableFuture<CallOptions> options = SettableFuture.create();
|
||||
Channel c = new Channel() {
|
||||
@Override
|
||||
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
|
||||
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
|
||||
options.set(callOptions);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String authority() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
new CensusTracingModule(tracer, mock(BinaryFormat.class))
|
||||
.getClientInterceptor()
|
||||
.interceptCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT, c);
|
||||
CallId callId = options.get().getOption(BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY);
|
||||
assertThat(callId.hi).isEqualTo(0);
|
||||
assertThat(callId.lo)
|
||||
.isEqualTo(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong());
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
private static void onServerMessageHelper(ServerCall.Listener listener, Object request) {
|
||||
listener.onMessage(request);
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
package io.grpc.internal;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY;
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -32,6 +33,7 @@ import static org.mockito.Matchers.eq;
|
|||
import static org.mockito.Matchers.isNull;
|
||||
import static org.mockito.Matchers.same;
|
||||
import static org.mockito.Mockito.inOrder;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
@ -40,7 +42,10 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
|
|||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.BinaryLogProvider;
|
||||
import io.grpc.BinaryLogProvider.CallId;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ClientCall;
|
||||
|
@ -61,6 +66,7 @@ import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer;
|
|||
import io.grpc.internal.testing.StatsTestUtils.FakeTagger;
|
||||
import io.grpc.internal.testing.StatsTestUtils.MockableSpan;
|
||||
import io.grpc.testing.GrpcServerRule;
|
||||
import io.grpc.testing.TestMethodDescriptors;
|
||||
import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants;
|
||||
import io.opencensus.tags.TagContext;
|
||||
import io.opencensus.tags.TagValue;
|
||||
|
@ -76,6 +82,7 @@ import io.opencensus.trace.propagation.BinaryFormat;
|
|||
import io.opencensus.trace.propagation.SpanContextParseException;
|
||||
import io.opencensus.trace.unsafe.ContextUtils;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -949,6 +956,54 @@ public class CensusModulesTest {
|
|||
"Recv.io.grpc.Bar", CensusTracingModule.generateTraceSpanName(true, "io.grpc/Bar"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests integration with binary logging.
|
||||
*/
|
||||
@Test
|
||||
public void callIdFromSpan() {
|
||||
MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0));
|
||||
CallId callId = CallId.fromCensusSpan(mockableSpan);
|
||||
assertThat(callId.hi).isEqualTo(0);
|
||||
assertThat(callId.lo)
|
||||
.isEqualTo(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests integration with binary logging.
|
||||
*/
|
||||
@Test
|
||||
public void censusTracerSetsCallId() throws Exception {
|
||||
Tracer tracer = mock(Tracer.class);
|
||||
SpanBuilder builder = mock(SpanBuilder.class);
|
||||
when(tracer.spanBuilderWithExplicitParent(any(String.class), any(Span.class)))
|
||||
.thenReturn(builder);
|
||||
when(builder.setRecordEvents(any(Boolean.class))).thenReturn(builder);
|
||||
MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0));
|
||||
when(builder.startSpan()).thenReturn(mockableSpan);
|
||||
|
||||
final SettableFuture<CallOptions> options = SettableFuture.create();
|
||||
Channel c = new Channel() {
|
||||
@Override
|
||||
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
|
||||
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
|
||||
options.set(callOptions);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String authority() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
new CensusTracingModule(tracer, mock(BinaryFormat.class))
|
||||
.getClientInterceptor()
|
||||
.interceptCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT, c);
|
||||
CallId callId = options.get().getOption(BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY);
|
||||
assertThat(callId.hi).isEqualTo(0);
|
||||
assertThat(callId.lo)
|
||||
.isEqualTo(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong());
|
||||
}
|
||||
|
||||
private static void assertNoServerContent(StatsTestUtils.MetricsRecord record) {
|
||||
assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT));
|
||||
assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT));
|
||||
|
|
|
@ -52,6 +52,7 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.BinaryLogProvider;
|
||||
import io.grpc.CallCredentials;
|
||||
import io.grpc.CallCredentials.MetadataApplier;
|
||||
import io.grpc.CallOptions;
|
||||
|
@ -64,6 +65,7 @@ import io.grpc.ConnectivityStateInfo;
|
|||
import io.grpc.Context;
|
||||
import io.grpc.EquivalentAddressGroup;
|
||||
import io.grpc.IntegerMarshaller;
|
||||
import io.grpc.InternalBinaryLogs;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancer.Helper;
|
||||
import io.grpc.LoadBalancer.PickResult;
|
||||
|
@ -244,7 +246,7 @@ public class ManagedChannelImplTest {
|
|||
.userAgent(USER_AGENT)
|
||||
.idleTimeout(AbstractManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS);
|
||||
channelBuilder.executorPool = executorPool;
|
||||
channelBuilder.binlogProvider = null;
|
||||
channelBuilder.binlog = null;
|
||||
channelBuilder.channelz = channelz;
|
||||
}
|
||||
|
||||
|
@ -2281,7 +2283,7 @@ public class ManagedChannelImplTest {
|
|||
@Test
|
||||
public void binaryLogInstalled() throws Exception {
|
||||
final SettableFuture<Boolean> intercepted = SettableFuture.create();
|
||||
channelBuilder.binlogProvider = new BinaryLogProvider() {
|
||||
channelBuilder.binlog = InternalBinaryLogs.createBinaryLog(new BinaryLogProvider() {
|
||||
@Nullable
|
||||
@Override
|
||||
public ServerInterceptor getServerInterceptor(String fullMethodName) {
|
||||
|
@ -2302,17 +2304,7 @@ public class ManagedChannelImplTest {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int priority() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isAvailable() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
createChannel();
|
||||
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
|
||||
|
|
|
@ -48,6 +48,7 @@ import com.google.common.util.concurrent.ListenableFuture;
|
|||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.BinaryLogProvider;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.ClientInterceptor;
|
||||
import io.grpc.Compressor;
|
||||
|
@ -55,6 +56,7 @@ import io.grpc.Context;
|
|||
import io.grpc.Grpc;
|
||||
import io.grpc.HandlerRegistry;
|
||||
import io.grpc.IntegerMarshaller;
|
||||
import io.grpc.InternalBinaryLogs;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.ServerCall;
|
||||
|
@ -1236,7 +1238,7 @@ public class ServerImplTest {
|
|||
@Test
|
||||
public void binaryLogInstalled() throws Exception {
|
||||
final SettableFuture<Boolean> intercepted = SettableFuture.create();
|
||||
builder.binlogProvider = new BinaryLogProvider() {
|
||||
builder.binlog = InternalBinaryLogs.createBinaryLog(new BinaryLogProvider() {
|
||||
@Nullable
|
||||
@Override
|
||||
public ServerInterceptor getServerInterceptor(String fullMethodName) {
|
||||
|
@ -1257,17 +1259,7 @@ public class ServerImplTest {
|
|||
String fullMethodName, CallOptions callOptions) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int priority() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isAvailable() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
});
|
||||
createAndStartServer();
|
||||
basicExchangeHelper(METHOD, "Lots of pizza, please", 314, 50);
|
||||
assertTrue(intercepted.get());
|
||||
|
|
|
@ -16,11 +16,10 @@
|
|||
|
||||
package io.grpc.services;
|
||||
|
||||
import io.grpc.BinaryLogProvider;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.ClientInterceptor;
|
||||
import io.grpc.ExperimentalApi;
|
||||
import io.grpc.ServerInterceptor;
|
||||
import io.grpc.internal.BinaryLogProvider;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
@ -29,8 +28,7 @@ import javax.annotation.Nullable;
|
|||
/**
|
||||
* The default implementation of a {@link BinaryLogProvider}.
|
||||
*/
|
||||
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4017")
|
||||
public class BinaryLogProviderImpl extends BinaryLogProvider {
|
||||
class BinaryLogProviderImpl extends BinaryLogProvider {
|
||||
private static final Logger logger = Logger.getLogger(BinaryLogProviderImpl.class.getName());
|
||||
private final BinlogHelper.Factory factory;
|
||||
private final AtomicLong counter = new AtomicLong();
|
||||
|
@ -64,16 +62,6 @@ public class BinaryLogProviderImpl extends BinaryLogProvider {
|
|||
return factory.getLog(fullMethodName).getClientInterceptor(getClientCallId(callOptions));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int priority() {
|
||||
return 5;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isAvailable() {
|
||||
return factory != null;
|
||||
}
|
||||
|
||||
protected CallId getServerCallId() {
|
||||
return new CallId(0, counter.getAndIncrement());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright 2018, gRPC Authors All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.services;
|
||||
|
||||
import io.grpc.BinaryLog;
|
||||
import io.grpc.ExperimentalApi;
|
||||
import io.grpc.InternalBinaryLogs;
|
||||
|
||||
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4017")
|
||||
public final class BinaryLogs {
|
||||
public static BinaryLog createBinaryLog() {
|
||||
return InternalBinaryLogs.createBinaryLog(new BinaryLogProviderImpl());
|
||||
}
|
||||
|
||||
public static BinaryLog createCensusBinaryLog() {
|
||||
return InternalBinaryLogs.createBinaryLog(new CensusBinaryLogProvider());
|
||||
}
|
||||
|
||||
private BinaryLogs() {}
|
||||
}
|
|
@ -16,7 +16,7 @@
|
|||
|
||||
package io.grpc.services;
|
||||
|
||||
import static io.grpc.internal.BinaryLogProvider.BYTEARRAY_MARSHALLER;
|
||||
import static io.grpc.BinaryLogProvider.BYTEARRAY_MARSHALLER;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -24,6 +24,7 @@ import com.google.common.base.Splitter;
|
|||
import com.google.common.primitives.Bytes;
|
||||
import com.google.protobuf.ByteString;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.BinaryLogProvider.CallId;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ClientCall;
|
||||
|
@ -50,7 +51,6 @@ import io.grpc.binarylog.MetadataEntry;
|
|||
import io.grpc.binarylog.Peer;
|
||||
import io.grpc.binarylog.Peer.PeerType;
|
||||
import io.grpc.binarylog.Uint128;
|
||||
import io.grpc.internal.BinaryLogProvider.CallId;
|
||||
import java.net.Inet4Address;
|
||||
import java.net.Inet6Address;
|
||||
import java.net.InetAddress;
|
||||
|
|
|
@ -14,20 +14,14 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.services.internal;
|
||||
package io.grpc.services;
|
||||
|
||||
import io.grpc.BinaryLogProvider;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.internal.BinaryLogProvider;
|
||||
import io.grpc.services.BinaryLogProviderImpl;
|
||||
import io.opencensus.trace.Span;
|
||||
import io.opencensus.trace.Tracing;
|
||||
|
||||
public final class CensusBinaryLogProvider extends BinaryLogProviderImpl {
|
||||
@Override
|
||||
protected int priority() {
|
||||
return 6;
|
||||
}
|
||||
|
||||
final class CensusBinaryLogProvider extends BinaryLogProviderImpl {
|
||||
@Override
|
||||
protected CallId getServerCallId() {
|
||||
Span currentSpan = Tracing.getTracer().getCurrentSpan();
|
|
@ -16,7 +16,7 @@
|
|||
|
||||
package io.grpc.services;
|
||||
|
||||
import static io.grpc.internal.BinaryLogProvider.BYTEARRAY_MARSHALLER;
|
||||
import static io.grpc.BinaryLogProvider.BYTEARRAY_MARSHALLER;
|
||||
import static io.grpc.services.BinlogHelper.DUMMY_SOCKET;
|
||||
import static io.grpc.services.BinlogHelper.getPeerSocket;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -31,6 +31,8 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
|
|||
import com.google.common.primitives.Bytes;
|
||||
import com.google.protobuf.ByteString;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.BinaryLogProvider;
|
||||
import io.grpc.BinaryLogProvider.CallId;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ClientCall;
|
||||
|
@ -47,8 +49,6 @@ import io.grpc.binarylog.MetadataEntry;
|
|||
import io.grpc.binarylog.Peer;
|
||||
import io.grpc.binarylog.Peer.PeerType;
|
||||
import io.grpc.binarylog.Uint128;
|
||||
import io.grpc.internal.BinaryLogProvider;
|
||||
import io.grpc.internal.BinaryLogProvider.CallId;
|
||||
import io.grpc.internal.NoopClientCall;
|
||||
import io.grpc.internal.NoopServerCall;
|
||||
import io.grpc.services.BinlogHelper.FactoryImpl;
|
||||
|
|
|
@ -14,16 +14,17 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.services.internal;
|
||||
package io.grpc.services;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY;
|
||||
|
||||
import io.grpc.BinaryLogProvider;
|
||||
import io.grpc.BinaryLogProvider.CallId;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Context;
|
||||
import io.grpc.internal.BinaryLogProvider;
|
||||
import io.grpc.internal.BinaryLogProvider.CallId;
|
||||
import io.grpc.internal.testing.StatsTestUtils.MockableSpan;
|
||||
import io.grpc.services.CensusBinaryLogProvider;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Random;
|
||||
import org.junit.Test;
|
Loading…
Reference in New Issue