protobuf{,lite,nano}: make more classes final

- Split anonymous classes in named and final classes
- Fix some Javadocs and annotate when things were added.
This commit is contained in:
Carl Mastrangelo 2018-06-19 15:30:12 -07:00 committed by GitHub
parent 49d7e5fd0b
commit 13ca42aff6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 233 additions and 188 deletions

View File

@ -30,7 +30,7 @@ import javax.annotation.Nullable;
/**
* An {@link InputStream} backed by a protobuf.
*/
class ProtoInputStream extends InputStream implements Drainable, KnownLength {
final class ProtoInputStream extends InputStream implements Drainable, KnownLength {
// ProtoInputStream is first initialized with a *message*. *partial* is initially null.
// Once there has been a read operation on this stream, *message* is serialized to *partial* and
@ -39,7 +39,7 @@ class ProtoInputStream extends InputStream implements Drainable, KnownLength {
private final Parser<?> parser;
@Nullable private ByteArrayInputStream partial;
public ProtoInputStream(MessageLite message, Parser<?> parser) {
ProtoInputStream(MessageLite message, Parser<?> parser) {
this.message = message;
this.parser = parser;
}
@ -103,7 +103,7 @@ class ProtoInputStream extends InputStream implements Drainable, KnownLength {
}
@Override
public int available() throws IOException {
public int available() {
if (message != null) {
return message.getSerializedSize();
} else if (partial != null) {

View File

@ -40,9 +40,10 @@ import java.lang.ref.WeakReference;
* Utility methods for using protobuf with grpc.
*/
@ExperimentalApi("Experimental until Lite is stable in protobuf")
public class ProtoLiteUtils {
public final class ProtoLiteUtils {
private static volatile ExtensionRegistryLite globalRegistry =
// default visibility to avoid synthetic accessors
static volatile ExtensionRegistryLite globalRegistry =
ExtensionRegistryLite.getEmptyRegistry();
private static final int BUF_SIZE = 8192;
@ -66,149 +67,31 @@ public class ProtoLiteUtils {
* <p>If you need custom parsing behavior for protos, you will need to make your own
* {@code MethodDescriptor.Marshaller} for the time being.
*
* @since 1.0.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1787")
public static void setExtensionRegistry(ExtensionRegistryLite newRegistry) {
globalRegistry = checkNotNull(newRegistry, "newRegistry");
}
private static final ThreadLocal<Reference<byte[]>> bufs = new ThreadLocal<Reference<byte[]>>() {
@Override
protected Reference<byte[]> initialValue() {
return new WeakReference<byte[]>(new byte[4096]); // Picked at random.
}
};
/** Create a {@code Marshaller} for protos of the same type as {@code defaultInstance}. */
public static <T extends MessageLite> Marshaller<T> marshaller(final T defaultInstance) {
@SuppressWarnings("unchecked")
final Parser<T> parser = (Parser<T>) defaultInstance.getParserForType();
/**
* Creates a {@link Marshaller} for protos of the same type as {@code defaultInstance}.
*
* @since 1.0.0
*/
public static <T extends MessageLite> Marshaller<T> marshaller(T defaultInstance) {
// TODO(ejona): consider changing return type to PrototypeMarshaller (assuming ABI safe)
return new PrototypeMarshaller<T>() {
@SuppressWarnings("unchecked")
@Override
public Class<T> getMessageClass() {
// Precisely T since protobuf doesn't let messages extend other messages.
return (Class<T>) defaultInstance.getClass();
}
@Override
public T getMessagePrototype() {
return defaultInstance;
}
@Override
public InputStream stream(T value) {
return new ProtoInputStream(value, parser);
}
@Override
public T parse(InputStream stream) {
if (stream instanceof ProtoInputStream) {
ProtoInputStream protoStream = (ProtoInputStream) stream;
// Optimization for in-memory transport. Returning provided object is safe since protobufs
// are immutable.
//
// However, we can't assume the types match, so we have to verify the parser matches.
// Today the parser is always the same for a given proto, but that isn't guaranteed. Even
// if not, using the same MethodDescriptor would ensure the parser matches and permit us
// to enable this optimization.
if (protoStream.parser() == parser) {
try {
@SuppressWarnings("unchecked")
T message = (T) ((ProtoInputStream) stream).message();
return message;
} catch (IllegalStateException ex) {
// Stream must have been read from, which is a strange state. Since the point of this
// optimization is to be transparent, instead of throwing an error we'll continue,
// even though it seems likely there's a bug.
}
}
}
CodedInputStream cis = null;
try {
if (stream instanceof KnownLength) {
int size = stream.available();
if (size > 0 && size <= DEFAULT_MAX_MESSAGE_SIZE) {
// buf should not be used after this method has returned.
byte[] buf = bufs.get().get();
if (buf == null || buf.length < size) {
buf = new byte[size];
bufs.set(new WeakReference<byte[]>(buf));
}
int remaining = size;
while (remaining > 0) {
int position = size - remaining;
int count = stream.read(buf, position, remaining);
if (count == -1) {
break;
}
remaining -= count;
}
if (remaining != 0) {
int position = size - remaining;
throw new RuntimeException("size inaccurate: " + size + " != " + position);
}
cis = CodedInputStream.newInstance(buf, 0, size);
} else if (size == 0) {
return defaultInstance;
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
if (cis == null) {
cis = CodedInputStream.newInstance(stream);
}
// Pre-create the CodedInputStream so that we can remove the size limit restriction
// when parsing.
cis.setSizeLimit(Integer.MAX_VALUE);
try {
return parseFrom(cis);
} catch (InvalidProtocolBufferException ipbe) {
throw Status.INTERNAL.withDescription("Invalid protobuf byte sequence")
.withCause(ipbe).asRuntimeException();
}
}
private T parseFrom(CodedInputStream stream) throws InvalidProtocolBufferException {
T message = parser.parseFrom(stream, globalRegistry);
try {
stream.checkLastTagWas(0);
return message;
} catch (InvalidProtocolBufferException e) {
e.setUnfinishedMessage(message);
throw e;
}
}
};
return new MessageMarshaller<T>(defaultInstance);
}
/**
* Produce a metadata marshaller for a protobuf type.
*
* @since 1.0.0
*/
public static <T extends MessageLite> Metadata.BinaryMarshaller<T> metadataMarshaller(
final T instance) {
return new Metadata.BinaryMarshaller<T>() {
@Override
public byte[] toBytes(T value) {
return value.toByteArray();
}
@Override
@SuppressWarnings("unchecked")
public T parseBytes(byte[] serialized) {
try {
return (T) instance.getParserForType().parseFrom(serialized, globalRegistry);
} catch (InvalidProtocolBufferException ipbe) {
throw new IllegalArgumentException(ipbe);
}
}
};
T defaultInstance) {
return new MetadataMarshaller<T>(defaultInstance);
}
/** Copies the data from input stream to output stream. */
@ -231,4 +114,145 @@ public class ProtoLiteUtils {
private ProtoLiteUtils() {
}
private static final class MessageMarshaller<T extends MessageLite>
implements PrototypeMarshaller<T> {
private static final ThreadLocal<Reference<byte[]>> bufs = new ThreadLocal<Reference<byte[]>>();
private final Parser<T> parser;
private final T defaultInstance;
@SuppressWarnings("unchecked")
MessageMarshaller(T defaultInstance) {
this.defaultInstance = defaultInstance;
parser = (Parser<T>) defaultInstance.getParserForType();
}
@SuppressWarnings("unchecked")
@Override
public Class<T> getMessageClass() {
// Precisely T since protobuf doesn't let messages extend other messages.
return (Class<T>) defaultInstance.getClass();
}
@Override
public T getMessagePrototype() {
return defaultInstance;
}
@Override
public InputStream stream(T value) {
return new ProtoInputStream(value, parser);
}
@Override
public T parse(InputStream stream) {
if (stream instanceof ProtoInputStream) {
ProtoInputStream protoStream = (ProtoInputStream) stream;
// Optimization for in-memory transport. Returning provided object is safe since protobufs
// are immutable.
//
// However, we can't assume the types match, so we have to verify the parser matches.
// Today the parser is always the same for a given proto, but that isn't guaranteed. Even
// if not, using the same MethodDescriptor would ensure the parser matches and permit us
// to enable this optimization.
if (protoStream.parser() == parser) {
try {
@SuppressWarnings("unchecked")
T message = (T) ((ProtoInputStream) stream).message();
return message;
} catch (IllegalStateException ex) {
// Stream must have been read from, which is a strange state. Since the point of this
// optimization is to be transparent, instead of throwing an error we'll continue,
// even though it seems likely there's a bug.
}
}
}
CodedInputStream cis = null;
try {
if (stream instanceof KnownLength) {
int size = stream.available();
if (size > 0 && size <= DEFAULT_MAX_MESSAGE_SIZE) {
Reference<byte[]> ref;
// buf should not be used after this method has returned.
byte[] buf;
if ((ref = bufs.get()) == null || (buf = ref.get()) == null || buf.length < size) {
buf = new byte[size];
bufs.set(new WeakReference<byte[]>(buf));
}
int remaining = size;
while (remaining > 0) {
int position = size - remaining;
int count = stream.read(buf, position, remaining);
if (count == -1) {
break;
}
remaining -= count;
}
if (remaining != 0) {
int position = size - remaining;
throw new RuntimeException("size inaccurate: " + size + " != " + position);
}
cis = CodedInputStream.newInstance(buf, 0, size);
} else if (size == 0) {
return defaultInstance;
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
if (cis == null) {
cis = CodedInputStream.newInstance(stream);
}
// Pre-create the CodedInputStream so that we can remove the size limit restriction
// when parsing.
cis.setSizeLimit(Integer.MAX_VALUE);
try {
return parseFrom(cis);
} catch (InvalidProtocolBufferException ipbe) {
throw Status.INTERNAL.withDescription("Invalid protobuf byte sequence")
.withCause(ipbe).asRuntimeException();
}
}
private T parseFrom(CodedInputStream stream) throws InvalidProtocolBufferException {
T message = parser.parseFrom(stream, globalRegistry);
try {
stream.checkLastTagWas(0);
return message;
} catch (InvalidProtocolBufferException e) {
e.setUnfinishedMessage(message);
throw e;
}
}
}
private static final class MetadataMarshaller<T extends MessageLite>
implements Metadata.BinaryMarshaller<T> {
private final T defaultInstance;
MetadataMarshaller(T defaultInstance) {
this.defaultInstance = defaultInstance;
}
@Override
public byte[] toBytes(T value) {
return value.toByteArray();
}
@Override
@SuppressWarnings("unchecked")
public T parseBytes(byte[] serialized) {
try {
return (T) defaultInstance.getParserForType().parseFrom(serialized, globalRegistry);
} catch (InvalidProtocolBufferException ipbe) {
throw new IllegalArgumentException(ipbe);
}
}
}
}

View File

@ -22,6 +22,8 @@ import com.google.protobuf.nano.MessageNano;
* Produce new message instances. Used by a marshaller to deserialize incoming messages.
*
* <p>Should be implemented by generated code.
*
* @since 1.0.0
*/
public interface MessageNanoFactory<T extends MessageNano> {
T newInstance();

View File

@ -27,7 +27,7 @@ import javax.annotation.Nullable;
/**
* An {@link InputStream} backed by a nano proto.
*/
class NanoProtoInputStream extends InputStream implements KnownLength {
final class NanoProtoInputStream extends InputStream implements KnownLength {
// NanoProtoInputStream is first initialized with a *message*. *partial* is initially null.
// Once there has been a read operation on this stream, *message* is serialized to *partial* and
@ -35,7 +35,7 @@ class NanoProtoInputStream extends InputStream implements KnownLength {
@Nullable private MessageNano message;
@Nullable private ByteArrayInputStream partial;
public NanoProtoInputStream(MessageNano message) {
NanoProtoInputStream(MessageNano message) {
this.message = message;
}
@ -84,7 +84,7 @@ class NanoProtoInputStream extends InputStream implements KnownLength {
}
@Override
public int available() throws IOException {
public int available() {
if (message != null) {
return message.getSerializedSize();
} else if (partial != null) {

View File

@ -30,60 +30,71 @@ import java.io.OutputStream;
/**
* Utility methods for using nano proto with grpc.
*/
public class NanoUtils {
private static final int BUF_SIZE = 8192;
public final class NanoUtils {
private NanoUtils() {}
/** Adapt {@code parser} to a {@code Marshaller}. */
public static <T extends MessageNano> Marshaller<T> marshaller(
final MessageNanoFactory<T> factory) {
return new Marshaller<T>() {
@Override
public InputStream stream(T value) {
return new NanoProtoInputStream(value);
}
@Override
public T parse(InputStream stream) {
try {
// TODO(simonma): Investigate whether we can do 0-copy here.
CodedInputByteBufferNano input =
CodedInputByteBufferNano.newInstance(toByteArray(stream));
input.setSizeLimit(Integer.MAX_VALUE);
T message = factory.newInstance();
message.mergeFrom(input);
return message;
} catch (IOException ipbe) {
throw Status.INTERNAL.withDescription("Failed parsing nano proto message").withCause(ipbe)
.asRuntimeException();
}
}
};
/**
* Adapt {@code parser} to a {@link Marshaller}.
*
* @since 1.0.0
*/
public static <T extends MessageNano> Marshaller<T> marshaller(MessageNanoFactory<T> factory) {
return new MessageMarshaller<T>(factory);
}
// Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
private static byte[] toByteArray(InputStream in) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
copy(in, out);
return out.toByteArray();
}
private static final class MessageMarshaller<T extends MessageNano> implements Marshaller<T> {
private static final int BUF_SIZE = 8192;
// Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
private static long copy(InputStream from, OutputStream to) throws IOException {
checkNotNull(from);
checkNotNull(to);
byte[] buf = new byte[BUF_SIZE];
long total = 0;
while (true) {
int r = from.read(buf);
if (r == -1) {
break;
}
to.write(buf, 0, r);
total += r;
private final MessageNanoFactory<T> factory;
MessageMarshaller(MessageNanoFactory<T> factory) {
this.factory = factory;
}
@Override
public InputStream stream(T value) {
return new NanoProtoInputStream(value);
}
@Override
public T parse(InputStream stream) {
try {
// TODO(simonma): Investigate whether we can do 0-copy here.
CodedInputByteBufferNano input =
CodedInputByteBufferNano.newInstance(toByteArray(stream));
input.setSizeLimit(Integer.MAX_VALUE);
T message = factory.newInstance();
message.mergeFrom(input);
return message;
} catch (IOException ipbe) {
throw Status.INTERNAL.withDescription("Failed parsing nano proto message").withCause(ipbe)
.asRuntimeException();
}
}
// Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
private static byte[] toByteArray(InputStream in) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
copy(in, out);
return out.toByteArray();
}
// Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
private static long copy(InputStream from, OutputStream to) throws IOException {
checkNotNull(from);
checkNotNull(to);
byte[] buf = new byte[BUF_SIZE];
long total = 0;
while (true) {
int r = from.read(buf);
if (r == -1) {
break;
}
to.write(buf, 0, r);
total += r;
}
return total;
}
return total;
}
}

View File

@ -20,6 +20,8 @@ import com.google.protobuf.Descriptors.FileDescriptor;
/**
* Provides access to the underlying proto file descriptor.
*
* @since 1.1.0
*/
public interface ProtoFileDescriptorSupplier {
/**

View File

@ -25,15 +25,21 @@ import io.grpc.protobuf.lite.ProtoLiteUtils;
/**
* Utility methods for using protobuf with grpc.
*/
public class ProtoUtils {
public final class ProtoUtils {
/** Create a {@code Marshaller} for protos of the same type as {@code defaultInstance}. */
/**
* Create a {@link Marshaller} for protos of the same type as {@code defaultInstance}.
*
* @since 1.0.0
*/
public static <T extends Message> Marshaller<T> marshaller(final T defaultInstance) {
return ProtoLiteUtils.marshaller(defaultInstance);
}
/**
* Produce a metadata key for a generated protobuf type.
*
* @since 1.0.0
*/
public static <T extends Message> Metadata.Key<T> keyForProto(T instance) {
return Metadata.Key.of(