Add a way to distinguish between advertised message encodings, and add tests

This commit is contained in:
Carl Mastrangelo 2015-08-28 11:27:54 -07:00
parent bdaf7b3236
commit 1ad2bf9eda
17 changed files with 448 additions and 185 deletions

View File

@ -33,8 +33,6 @@ package io.grpc;
import com.google.common.base.Objects;
import io.grpc.MessageEncoding.Compressor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

View File

@ -0,0 +1,95 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
/**
* Encloses classes related to the compression and decompression of messages.
*
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/492")
public interface Codec extends Compressor, Decompressor {
/**
* A gzip compressor and decompressor. In the future this will likely support other
* compression methods, such as compression level.
*/
public static final class Gzip implements Codec {
@Override
public String getMessageEncoding() {
return "gzip";
}
@Override
public OutputStream compress(OutputStream os) throws IOException {
return new GZIPOutputStream(os);
}
@Override
public InputStream decompress(InputStream is) throws IOException {
return new GZIPInputStream(is);
}
}
/**
* The "identity", or "none" codec. This codec is special in that it can be used to explicitly
* disable Call compression on a Channel that by default compresses.
*/
public static final class Identity implements Codec {
/**
* Special sentinel codec indicating that no compression should be used. Users should use
* reference equality to see if compression is disabled.
*/
public static final Codec NONE = new Identity();
@Override
public InputStream decompress(InputStream is) throws IOException {
return is;
}
@Override
public String getMessageEncoding() {
return "identity";
}
@Override
public OutputStream compress(OutputStream os) throws IOException {
return os;
}
private Identity() {}
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import java.io.IOException;
import java.io.OutputStream;
/**
* Represents a message compressor.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/492")
public interface Compressor {
/**
* Returns the message encoding that this compressor uses.
*
* <p>This can be values such as "gzip", "deflate", "snappy", etc.
*/
String getMessageEncoding();
/**
* Wraps an existing output stream with a compressing output stream.
* @param os The output stream of uncompressed data
* @return An output stream that compresses
*/
OutputStream compress(OutputStream os) throws IOException;
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import java.io.IOException;
import java.io.InputStream;
/**
* Represents a message decompressor.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/492")
public interface Decompressor {
/**
* Returns the message encoding that this compressor uses.
*
* <p>This can be values such as "gzip", "deflate", "snappy", etc.
*/
String getMessageEncoding();
/**
* Wraps an existing input stream with a decompressing input stream.
* @param is The input stream of uncompressed data
* @return An input stream that decompresses
*/
InputStream decompress(InputStream is) throws IOException;
}

View File

@ -0,0 +1,122 @@
package io.grpc;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
/**
* Encloses classes related to the compression and decompression of messages.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/492")
public final class DecompressorRegistry {
private static final DecompressorRegistry INSTANCE = new DecompressorRegistry();
private final ConcurrentMap<String, DecompressorInfo> decompressors;
/**
* Registers a decompressor for both decompression and message encoding negotiation.
*
* @param d The decompressor to register
* @param advertised If true, the message encoding will be listed in the Accept-Encoding header.
* @throws IllegalArgumentException if another compressor by the same name is already registered.
*/
public static void register(Decompressor d, boolean advertised) {
INSTANCE.internalRegister(d, advertised);
}
@VisibleForTesting
void internalRegister(Decompressor d, boolean advertised) {
DecompressorInfo previousInfo = decompressors.putIfAbsent(
d.getMessageEncoding(), new DecompressorInfo(d, advertised));
if (previousInfo != null) {
throw new IllegalArgumentException(
"A decompressor was already registered: " + previousInfo.decompressor);
}
}
/**
* Provides a list of all message encodings that have decompressors available.
*/
public static Set<String> getKnownMessageEncodings() {
return INSTANCE.internalGetKnownMessageEncodings();
}
@VisibleForTesting
Set<String> internalGetKnownMessageEncodings() {
return Collections.unmodifiableSet(decompressors.keySet());
}
/**
* Provides a list of all message encodings that have decompressors available and should be
* advertised.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/492")
public static Set<String> getAdvertisedMessageEncodings() {
return INSTANCE.internalGetAdvertisedMessageEncodings();
}
@VisibleForTesting
Set<String> internalGetAdvertisedMessageEncodings() {
Set<String> advertisedDecompressors = new HashSet<String>();
for (Entry<String, DecompressorInfo> entry : decompressors.entrySet()) {
if (entry.getValue().advertised) {
advertisedDecompressors.add(entry.getKey());
}
}
return Collections.unmodifiableSet(advertisedDecompressors);
}
/**
* Returns a decompressor for the given message encoding, or {@code null} if none has been
* registered.
*
* <p>This ignores whether the compressor is advertised. According to the spec, if we know how
* to process this encoding, we attempt to, regardless of whether or not it is part of the
* encodings sent to the remote host.
*/
@Nullable
public static Decompressor lookupDecompressor(String messageEncoding) {
return INSTANCE.internalLookupDecompressor(messageEncoding);
}
@Nullable
@VisibleForTesting
Decompressor internalLookupDecompressor(String messageEncoding) {
DecompressorInfo info = decompressors.get(messageEncoding);
return info != null ? info.decompressor : null;
}
@VisibleForTesting
DecompressorRegistry() {
decompressors = new ConcurrentHashMap<String, DecompressorInfo>();
Decompressor gzip = new Codec.Gzip();
// By default, Gzip
decompressors.put(gzip.getMessageEncoding(), new DecompressorInfo(gzip, false));
decompressors.put(
Codec.Identity.NONE.getMessageEncoding(), new DecompressorInfo(Codec.Identity.NONE, false));
}
/**
* Information about a decompressor.
*/
private static final class DecompressorInfo {
private final Decompressor decompressor;
private volatile boolean advertised;
DecompressorInfo(Decompressor decompressor, boolean advertised) {
this.decompressor = checkNotNull(decompressor);
this.advertised = advertised;
}
}
}

View File

@ -1,143 +0,0 @@
package io.grpc;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
/**
* Encloses classes related to the compression and decompression of messages.
*/
public final class MessageEncoding {
/**
* Special sentinel codec indicating that no compression should be used. Users should use
* reference equality to see if compression is disabled.
*/
public static final Codec NONE = new Codec() {
@Override
public InputStream decompress(InputStream is) throws IOException {
return is;
}
@Override
public String getMessageEncoding() {
return "identity";
}
@Override
public OutputStream compress(OutputStream os) throws IOException {
return os;
}
};
private static final ConcurrentMap<String, Decompressor> decompressors =
initializeDefaultDecompressors();
/**
* Represents a message compressor.
*/
public interface Compressor {
/**
* Returns the message encoding that this compressor uses.
*
* <p>This can be values such as "gzip", "deflate", "snappy", etc.
*/
String getMessageEncoding();
/**
* Wraps an existing output stream with a compressing output stream.
* @param os The output stream of uncompressed data
* @return An output stream that compresses
*/
OutputStream compress(OutputStream os) throws IOException;
}
/**
* Represents a message decompressor.
*/
public interface Decompressor {
/**
* Returns the message encoding that this compressor uses.
*
* <p>This can be values such as "gzip", "deflate", "snappy", etc.
*/
String getMessageEncoding();
/**
* Wraps an existing input stream with a decompressing input stream.
* @param is The input stream of uncompressed data
* @return An input stream that decompresses
*/
InputStream decompress(InputStream is) throws IOException;
}
/**
* Represents an object that can both compress and decompress messages.
*/
public interface Codec extends Compressor, Decompressor {}
/**
* A gzip compressor and decompressor. In the future this will likely support other
* compression methods, such as compression level.
*/
public static final class Gzip implements Codec {
@Override
public String getMessageEncoding() {
return "gzip";
}
@Override
public OutputStream compress(OutputStream os) throws IOException {
return new GZIPOutputStream(os);
}
@Override
public InputStream decompress(InputStream is) throws IOException {
return new GZIPInputStream(is);
}
}
/**
* Registers a decompressor for both decompression and message encoding negotiation.
* @throws IllegalArgumentException if another compressor by the same name is already registered.
*/
public static final void registerDecompressor(Decompressor d) {
Decompressor previousDecompressor = decompressors.putIfAbsent(d.getMessageEncoding(), d);
checkArgument(previousDecompressor == null,
"A decompressor was already registered: %s", previousDecompressor);
}
/**
* Provides a list of all message encodings that have decompressors available.
*/
public static Collection<String> getKnownMessageEncodings() {
return Collections.unmodifiableSet(decompressors.keySet());
}
/**
* Returns a decompressor for the given message encoding, or {@code null} if none has been
* registered.
*/
@Nullable
public static Decompressor lookupDecompressor(String messageEncoding) {
return decompressors.get(messageEncoding);
}
private static ConcurrentMap<String, Decompressor> initializeDefaultDecompressors() {
ConcurrentMap<String, Decompressor> defaultDecompressors =
new ConcurrentHashMap<String, Decompressor>();
Decompressor gzip = new Gzip();
defaultDecompressors.put(gzip.getMessageEncoding(), gzip);
defaultDecompressors.put(NONE.getMessageEncoding(), NONE);
return defaultDecompressors;
}
}

View File

@ -31,7 +31,7 @@
package io.grpc.inprocess;
import io.grpc.MessageEncoding.Compressor;
import io.grpc.Compressor;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;

View File

@ -38,9 +38,10 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import io.grpc.MessageEncoding;
import io.grpc.MessageEncoding.Compressor;
import io.grpc.MessageEncoding.Decompressor;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry;
import java.io.InputStream;
@ -130,7 +131,7 @@ public abstract class AbstractStream<IdT> implements Stream {
};
framer = new MessageFramer(outboundFrameHandler, bufferAllocator);
deframer = new MessageDeframer(inboundMessageHandler, MessageEncoding.NONE, maxMessageSize);
deframer = new MessageDeframer(inboundMessageHandler, Codec.Identity.NONE, maxMessageSize);
}
@Override
@ -303,13 +304,13 @@ public abstract class AbstractStream<IdT> implements Stream {
/**
* Looks up the decompressor by its message encoding name, and sets it for this stream.
* Decompressors are registered with {@link MessageEncoding#registerDecompressor}.
* Decompressors are registered with {@link DecompressorRegistry#registerDecompressor}.
*
* @param messageEncoding the name of the encoding provided by the remote host
* @throws IllegalArgumentException if the provided message encoding cannot be found.
*/
protected final void setDecompressor(String messageEncoding) {
Decompressor d = MessageEncoding.lookupDecompressor(messageEncoding);
Decompressor d = DecompressorRegistry.lookupDecompressor(messageEncoding);
checkArgument(d != null,
"Unable to find decompressor for message encoding %s", messageEncoding);
setDecompressor(d);

View File

@ -41,8 +41,8 @@ import com.google.common.base.Throwables;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.MessageEncoding;
import io.grpc.MessageEncoding.Compressor;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
@ -142,7 +142,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
headers.removeAll(MESSAGE_ENCODING_KEY);
Compressor compressor = callOptions.getCompressor();
if (compressor != null && compressor != MessageEncoding.NONE) {
if (compressor != null && compressor != Codec.Identity.NONE) {
headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
}

View File

@ -38,10 +38,10 @@ import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.ExperimentalApi;
import io.grpc.ManagedChannel;
import io.grpc.MessageEncoding;
import io.grpc.MessageEncoding.Compressor;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
@ -136,12 +136,12 @@ public final class ManagedChannelImpl extends ManagedChannel {
* If the remote host does not support the message encoding, the call will likely break. There
* is currently no provided way to discover what message encodings the remote host supports.
* @param c The compressor to use. If {@code null} no compression will by performed. This is
* equivalent to using {@link MessageEncoding#NONE}. If not null, the Comressor must be
* equivalent to using {@code Codec.Identity.NONE}. If not null, the Compressor must be
* threadsafe.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/492")
public void setDefaultCompressor(@Nullable Compressor c) {
defaultCompressor = (c != null) ? c : MessageEncoding.NONE;
defaultCompressor = (c != null) ? c : Codec.Identity.NONE;
}
/**
@ -244,7 +244,7 @@ public final class ManagedChannelImpl extends ManagedChannel {
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions) {
boolean hasCodecOverride = callOptions.getCompressor() != null;
if (!hasCodecOverride && defaultCompressor != MessageEncoding.NONE) {
if (!hasCodecOverride && defaultCompressor != Codec.Identity.NONE) {
callOptions = callOptions.withCompressor(defaultCompressor);
}
return interceptorChannel.newCall(method, callOptions);

View File

@ -35,7 +35,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.Preconditions;
import io.grpc.MessageEncoding;
import io.grpc.Codec;
import io.grpc.Decompressor;
import io.grpc.Status;
import java.io.Closeable;
@ -96,7 +97,7 @@ public class MessageDeframer implements Closeable {
private final Listener listener;
private final int maxMessageSize;
private MessageEncoding.Decompressor decompressor;
private Decompressor decompressor;
private State state = State.HEADER;
private int requiredLength = HEADER_LENGTH;
private boolean compressedFlag;
@ -115,8 +116,7 @@ public class MessageDeframer implements Closeable {
* {@code NONE} meaning unsupported
* @param maxMessageSize the maximum allowed size for received messages.
*/
public MessageDeframer(Listener listener, MessageEncoding.Decompressor decompressor,
int maxMessageSize) {
public MessageDeframer(Listener listener, Decompressor decompressor, int maxMessageSize) {
this.listener = Preconditions.checkNotNull(listener, "sink");
this.decompressor = Preconditions.checkNotNull(decompressor, "decompressor");
this.maxMessageSize = maxMessageSize;
@ -129,7 +129,7 @@ public class MessageDeframer implements Closeable {
*
* @param decompressor the decompressing wrapper.
*/
public void setDecompressor(MessageEncoding.Decompressor decompressor) {
public void setDecompressor(Decompressor decompressor) {
this.decompressor = checkNotNull(decompressor, "Can't pass an empty decompressor");
}
@ -359,7 +359,7 @@ public class MessageDeframer implements Closeable {
}
private InputStream getCompressedBody() {
if (decompressor == MessageEncoding.NONE) {
if (decompressor == Codec.Identity.NONE) {
throw Status.INTERNAL.withDescription(
"Can't decode compressed frame as compression not configured.").asRuntimeException();
}

View File

@ -37,10 +37,10 @@ import static java.lang.Math.min;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.Drainable;
import io.grpc.KnownLength;
import io.grpc.MessageEncoding;
import io.grpc.MessageEncoding.Compressor;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -91,7 +91,7 @@ public class MessageFramer {
* @param bufferAllocator allocates buffers that the transport can commit to the wire.
*/
public MessageFramer(Sink sink, WritableBufferAllocator bufferAllocator) {
this(sink, bufferAllocator, MessageEncoding.NONE);
this(sink, bufferAllocator, Codec.Identity.NONE);
}
/**
@ -101,14 +101,13 @@ public class MessageFramer {
* @param bufferAllocator allocates buffers that the transport can commit to the wire.
* @param compressor the compressor to use
*/
public MessageFramer(Sink sink, WritableBufferAllocator bufferAllocator,
MessageEncoding.Compressor compressor) {
public MessageFramer(Sink sink, WritableBufferAllocator bufferAllocator, Compressor compressor) {
this.sink = Preconditions.checkNotNull(sink, "sink");
this.bufferAllocator = bufferAllocator;
this.compressor = Preconditions.checkNotNull(compressor, "compressor");
}
public void setCompressor(MessageEncoding.Compressor compressor) {
public void setCompressor(Compressor compressor) {
this.compressor = checkNotNull(compressor, "Can't pass an empty compressor");
}
@ -120,7 +119,7 @@ public class MessageFramer {
public void writePayload(InputStream message) {
verifyNotClosed();
try {
if (compressor != MessageEncoding.NONE) {
if (compressor != Codec.Identity.NONE) {
writeCompressed(message);
} else {
writeUncompressed(message);
@ -192,8 +191,7 @@ public class MessageFramer {
/**
* Write a message that has been serialized to a sequence of buffers.
*/
private void writeBufferChain(BufferChainOutputStream bufferChain, boolean compressed)
throws IOException {
private void writeBufferChain(BufferChainOutputStream bufferChain, boolean compressed) {
ByteBuffer header = ByteBuffer.wrap(headerScratch);
header.put(compressed ? COMPRESSED : UNCOMPRESSED);
int messageLength = bufferChain.readableBytes();

View File

@ -31,7 +31,7 @@
package io.grpc.internal;
import io.grpc.MessageEncoding.Compressor;
import io.grpc.Compressor;
import java.io.InputStream;

View File

@ -0,0 +1,80 @@
package io.grpc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Set;
/**
* Tests for {@link DecompressorRegistry}.
*/
@RunWith(JUnit4.class)
public class DecompressorRegistryTest {
private final Dummy dummyDecompressor = new Dummy();
private final DecompressorRegistry registry = new DecompressorRegistry();
@Test
public void lookupDecompressor_checkDefaultMessageEncodingsExist() {
// Explicitly put the names in, rather than link against MessageEncoding
assertNotNull("Expected identity to be registered",
registry.internalLookupDecompressor("identity"));
assertNotNull("Expected gzip to be registered",
registry.internalLookupDecompressor("gzip"));
}
@Test
public void getKnownMessageEncodings_checkDefaultMessageEncodingsExist() {
Set<String> knownEncodings = new HashSet<String>();
knownEncodings.add("identity");
knownEncodings.add("gzip");
assertEquals(knownEncodings, registry.internalGetKnownMessageEncodings());
}
/*
* This test will likely change once encoders are advertised
*/
@Test
public void getAdvertisedMessageEncodings_noEncodingsAdvertised() {
assertTrue(registry.internalGetAdvertisedMessageEncodings().isEmpty());
}
@Test
public void registerDecompressor_advertisedDecompressor() {
registry.internalRegister(dummyDecompressor, true);
assertTrue(registry.internalGetAdvertisedMessageEncodings()
.contains(dummyDecompressor.getMessageEncoding()));
}
@Test
public void registerDecompressor_nonadvertisedDecompressor() {
DecompressorRegistry.register(dummyDecompressor, false);
assertFalse(registry.internalGetAdvertisedMessageEncodings()
.contains(dummyDecompressor.getMessageEncoding()));
}
private static final class Dummy implements Decompressor {
@Override
public String getMessageEncoding() {
return "dummy";
}
@Override
public InputStream decompress(InputStream is) throws IOException {
return is;
}
}
}

View File

@ -38,7 +38,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.verify;
import io.grpc.MessageEncoding;
import io.grpc.Codec;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
@ -211,7 +211,7 @@ public class AbstractClientStreamTest {
AbstractClientStream<Integer> stream =
new BaseAbstractClientStream<Integer>(allocator, mockListener);
Metadata headers = new Metadata();
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, new MessageEncoding.Gzip().getMessageEncoding());
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, new Codec.Gzip().getMessageEncoding());
stream.inboundHeadersReceived(headers);
verify(mockListener).headersRead(headers);
@ -222,7 +222,7 @@ public class AbstractClientStreamTest {
AbstractClientStream<Integer> stream =
new BaseAbstractClientStream<Integer>(allocator, mockListener);
Metadata headers = new Metadata();
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, MessageEncoding.NONE.getMessageEncoding());
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, Codec.Identity.NONE.getMessageEncoding());
stream.inboundHeadersReceived(headers);
verify(mockListener).headersRead(headers);

View File

@ -45,7 +45,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Bytes;
import io.grpc.MessageEncoding;
import io.grpc.Codec;
import io.grpc.internal.MessageDeframer.Listener;
import org.junit.Test;
@ -68,7 +68,7 @@ import java.util.zip.GZIPOutputStream;
@RunWith(JUnit4.class)
public class MessageDeframerTest {
private Listener listener = mock(Listener.class);
private MessageDeframer deframer = new MessageDeframer(listener, MessageEncoding.NONE,
private MessageDeframer deframer = new MessageDeframer(listener, Codec.Identity.NONE,
DEFAULT_MAX_MESSAGE_SIZE);
private ArgumentCaptor<InputStream> messages = ArgumentCaptor.forClass(InputStream.class);
@ -178,7 +178,7 @@ public class MessageDeframerTest {
@Test
public void compressed() {
deframer = new MessageDeframer(listener, new MessageEncoding.Gzip(), DEFAULT_MAX_MESSAGE_SIZE);
deframer = new MessageDeframer(listener, new Codec.Gzip(), DEFAULT_MAX_MESSAGE_SIZE);
deframer.request(1);
byte[] payload = compress(new byte[1000]);

View File

@ -40,7 +40,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import io.grpc.MessageEncoding;
import io.grpc.Codec;
import org.junit.Before;
import org.junit.Test;
@ -236,7 +236,7 @@ public class MessageFramerTest {
@Test
public void compressed() throws Exception {
allocator = new BytesWritableBufferAllocator(100, Integer.MAX_VALUE);
framer = new MessageFramer(sink, allocator, new MessageEncoding.Gzip());
framer = new MessageFramer(sink, allocator, new Codec.Gzip());
writeKnownLength(framer, new byte[1000]);
// The GRPC header is written first as a separate frame
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(false));
@ -274,7 +274,7 @@ public class MessageFramerTest {
}
}
};
framer = new MessageFramer(reentrant, allocator, MessageEncoding.NONE);
framer = new MessageFramer(reentrant, allocator, Codec.Identity.NONE);
writeKnownLength(framer, new byte[]{3, 14});
framer.close();
}