Replace DeferredInputStream with interface Drainable.

- Rename flushTo() to drainTo().
- Remove flushTo() from DeferredNanoProtoInputStream (which is renamed
  to NanoProtoInputStream), because the optimization is not implemented.
- Rename DeferredProtoInputStream to ProtoInputStream.

 #529
This commit is contained in:
Kun Zhang 2015-07-20 15:42:34 -07:00
parent eb92967a7e
commit a6585e36ed
7 changed files with 34 additions and 51 deletions

View File

@ -5,7 +5,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.CallOptions;
import io.grpc.ChannelImpl;
import io.grpc.ClientCall;
import io.grpc.DeferredInputStream;
import io.grpc.Drainable;
import io.grpc.KnownLength;
import io.grpc.Marshaller;
import io.grpc.Metadata;
@ -504,7 +504,7 @@ public abstract class AbstractBenchmark {
@Override
public InputStream stream(ByteBuf value) {
return new DeferredByteBufInputStream(value);
return new ByteBufInputStream(value);
}
@Override
@ -521,19 +521,19 @@ public abstract class AbstractBenchmark {
}
/**
* Implementation of {@link io.grpc.DeferredInputStream} for {@link io.netty.buffer.ByteBuf}.
* A {@link Drainable} {@code InputStream} that reads an {@link io.netty.buffer.ByteBuf}.
*/
private static class DeferredByteBufInputStream extends DeferredInputStream<ByteBuf>
implements KnownLength {
private static class ByteBufInputStream extends InputStream
implements Drainable, KnownLength {
private ByteBuf buf;
private DeferredByteBufInputStream(ByteBuf buf) {
private ByteBufInputStream(ByteBuf buf) {
this.buf = buf;
}
@Override
public int flushTo(OutputStream target) throws IOException {
public int drainTo(OutputStream target) throws IOException {
int readbableBytes = buf.readableBytes();
buf.readBytes(target, readbableBytes);
buf = null;

View File

@ -32,21 +32,24 @@
package io.grpc;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* Extension to {@link InputStream} to allow for deferred production of data. Allows for
* zero-copy conversions when the goal is to copy the contents of a resource to a
* stream or buffer.
* Extension to an {@link java.io.InputStream} or alike by adding a method that transfers all
* content to an {@link OutputStream}.
*
* <p>This can be used for optimizing for the case where the content of the input stream will be
* written to an {@link OutputStream} eventually. Instead of copying the content to a byte array
* through {@code read()}, then writing the the {@code OutputStream}, the implementation can write
* the content directly to the {@code OutputStream}.
*/
public abstract class DeferredInputStream<T> extends InputStream {
public interface Drainable {
/**
* Produce the entire contents of this stream to the specified target.
* Transfers the entire contents of this stream to the specified target.
*
* @param target to write to.
* @return number of bytes written.
*/
public abstract int flushTo(OutputStream target) throws IOException;
int drainTo(OutputStream target) throws IOException;
}

View File

@ -36,7 +36,7 @@ import static java.lang.Math.min;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import io.grpc.DeferredInputStream;
import io.grpc.Drainable;
import io.grpc.KnownLength;
import java.io.ByteArrayInputStream;
@ -218,8 +218,8 @@ public class MessageFramer {
@SuppressWarnings("rawtypes")
private static long writeToOutputStream(InputStream message, OutputStream outputStream)
throws IOException {
if (message instanceof DeferredInputStream) {
return ((DeferredInputStream) message).flushTo(outputStream);
if (message instanceof Drainable) {
return ((Drainable) message).drainTo(outputStream);
} else {
// This makes an unnecessary copy of the bytes when bytebuf supports array(). However, we
// expect performance-critical code to support flushTo().

View File

@ -31,33 +31,29 @@
package io.grpc.protobuf.nano;
import com.google.common.io.ByteStreams;
import com.google.protobuf.nano.CodedOutputByteBufferNano;
import com.google.protobuf.nano.MessageNano;
import io.grpc.DeferredInputStream;
import io.grpc.KnownLength;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.InputStream;
import javax.annotation.Nullable;
/**
* Implementation of {@link DeferredInputStream} backed by a nano proto.
* An {@link InputStream} backed by a nano proto.
*/
public class DeferredNanoProtoInputStream extends DeferredInputStream<MessageNano>
implements KnownLength {
public class NanoProtoInputStream extends InputStream implements KnownLength {
// DeferredNanoProtoInputStream is first initialized with a *message*. *partial* is initially
// null.
// NanoProtoInputStream is first initialized with a *message*. *partial* is initially null.
// Once there has been a read operation on this stream, *message* is serialized to *partial* and
// set to null.
@Nullable private MessageNano message;
@Nullable private ByteArrayInputStream partial;
public DeferredNanoProtoInputStream(MessageNano message) {
public NanoProtoInputStream(MessageNano message) {
this.message = message;
}
@ -68,22 +64,6 @@ public class DeferredNanoProtoInputStream extends DeferredInputStream<MessageNan
}
}
@Override
public int flushTo(OutputStream target) throws IOException {
// TODO(simonma): flushTo is an optimization of DeferredInputStream, for the implementations
// that can write data directly to OutputStream, if we don't support flushTo (by not extending
// DeferredInputStream), the caller will use ByteStreams.copy anyway. So consider extends
// InputStream directly or make a real optimization here (like save the byte[] and use it for a
// single target.write()).
int written = 0;
toPartial();
if (partial != null) {
written = (int) ByteStreams.copy(partial, target);
partial = null;
}
return written;
}
@Override
public int read() throws IOException {
toPartial();

View File

@ -53,7 +53,7 @@ public class NanoUtils {
return new Marshaller<T>() {
@Override
public InputStream stream(T value) {
return new DeferredNanoProtoInputStream(value);
return new NanoProtoInputStream(value);
}
@Override

View File

@ -35,33 +35,33 @@ import com.google.common.io.ByteStreams;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.MessageLite;
import io.grpc.DeferredInputStream;
import io.grpc.Drainable;
import io.grpc.KnownLength;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import javax.annotation.Nullable;
/**
* Implementation of {@link io.grpc.DeferredInputStream} backed by a protobuf.
* An {@link InputStream} backed by a protobuf.
*/
public class DeferredProtoInputStream extends DeferredInputStream<MessageLite>
implements KnownLength {
public class ProtoInputStream extends InputStream implements Drainable, KnownLength {
// DeferredProtoInputStream is first initialized with a *message*. *partial* is initially null.
// ProtoInputStream is first initialized with a *message*. *partial* is initially null.
// Once there has been a read operation on this stream, *message* is serialized to *partial* and
// set to null.
@Nullable private MessageLite message;
@Nullable private ByteArrayInputStream partial;
public DeferredProtoInputStream(MessageLite message) {
public ProtoInputStream(MessageLite message) {
this.message = message;
}
@Override
public int flushTo(OutputStream target) throws IOException {
public int drainTo(OutputStream target) throws IOException {
int written;
if (message != null) {
written = message.getSerializedSize();

View File

@ -52,7 +52,7 @@ public class ProtoUtils {
return new Marshaller<T>() {
@Override
public InputStream stream(T value) {
return new DeferredProtoInputStream(value);
return new ProtoInputStream(value);
}
@Override