Add compressor registry, and auto negotiate compression

This commit is contained in:
Carl Mastrangelo 2015-12-03 15:26:39 -08:00
parent 96f9cefda4
commit 529b14c07b
17 changed files with 270 additions and 178 deletions

View File

@ -55,10 +55,6 @@ public final class CallOptions {
// unnamed arguments, which is undesirable.
private Long deadlineNanoTime;
@Nullable
private Compressor compressor;
@Nullable
private String authority;
@ -114,25 +110,6 @@ public final class CallOptions {
return deadlineNanoTime;
}
/**
* Returns the compressor, or {@code null} if none is set.
*/
@Nullable
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/492")
public Compressor getCompressor() {
return compressor;
}
/**
* Use the desired compression.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/492")
public CallOptions withCompressor(@Nullable Compressor compressor) {
CallOptions newOptions = new CallOptions(this);
newOptions.compressor = compressor;
return newOptions;
}
/**
* Returns a new {@code CallOptions} with a request key for affinity-based routing.
*/
@ -175,7 +152,6 @@ public final class CallOptions {
*/
private CallOptions(CallOptions other) {
deadlineNanoTime = other.deadlineNanoTime;
compressor = other.compressor;
authority = other.authority;
requestKey = other.requestKey;
}
@ -188,7 +164,6 @@ public final class CallOptions {
long remainingNanos = deadlineNanoTime - System.nanoTime();
toStringHelper.addValue(remainingNanos + " ns from now");
}
toStringHelper.add("compressor", compressor);
toStringHelper.add("authority", authority);
return toStringHelper.toString();

View File

@ -0,0 +1,73 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
/**
* Encloses classes related to the compression and decompression of messages.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/492")
@ThreadSafe
public final class CompressorRegistry {
private static final CompressorRegistry DEFAULT_INSTANCE = new CompressorRegistry(
new Codec.Gzip());
public static CompressorRegistry getDefaultInstance() {
return DEFAULT_INSTANCE;
}
public static CompressorRegistry newEmptyInstance() {
return new CompressorRegistry();
}
private final ConcurrentMap<String, Compressor> compressors;
@VisibleForTesting
CompressorRegistry(Compressor ...cs) {
compressors = new ConcurrentHashMap<String, Compressor>();
for (Compressor c : cs) {
compressors.put(c.getMessageEncoding(), c);
}
}
@Nullable
public Compressor lookupCompressor(String compressorName) {
return compressors.get(compressorName);
}
}

View File

@ -31,7 +31,7 @@
package io.grpc.inprocess;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
@ -215,12 +215,6 @@ class InProcessTransport implements ServerTransport, ClientTransport {
clientStreamListener = listener;
}
@Override
public void setCompressor(Compressor c) {
// I don't *think* there is any good reason to do this, so just throw away the compressor
// intentional nop
}
@Override
public void setDecompressionRegistry(DecompressorRegistry registry) {}
@ -334,6 +328,12 @@ class InProcessTransport implements ServerTransport, ClientTransport {
public void setMessageCompression(boolean enable) {
// noop
}
@Override
public void pickCompressor(Iterable<String> messageEncodings) {}
@Override
public void setCompressionRegistry(CompressorRegistry registry) {}
}
private class InProcessClientStream implements ClientStream {
@ -440,18 +440,17 @@ class InProcessTransport implements ServerTransport, ClientTransport {
}
}
@Override
public void setCompressor(Compressor c) {
// nop
}
@Override
public void setDecompressionRegistry(DecompressorRegistry registry) {}
@Override
public void setMessageCompression(boolean enable) {
// noop
}
public void setMessageCompression(boolean enable) {}
@Override
public void pickCompressor(Iterable<String> messageEncodings) {}
@Override
public void setCompressionRegistry(CompressorRegistry registry) {}
}
}
}

View File

@ -32,6 +32,7 @@
package io.grpc.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITER;
import com.google.common.base.Preconditions;
@ -147,6 +148,10 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
return;
}
}
if (headers.containsKey(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY)) {
pickCompressor(
ACCEPT_ENCODING_SPLITER.split(headers.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY)));
}
inboundPhase(Phase.MESSAGE);
}

View File

@ -40,6 +40,7 @@ import com.google.common.base.MoreObjects;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry;
@ -102,6 +103,8 @@ public abstract class AbstractStream<IdT> implements Stream {
private final Object onReadyLock = new Object();
private volatile DecompressorRegistry decompressorRegistry =
DecompressorRegistry.getDefaultInstance();
private volatile CompressorRegistry compressorRegistry =
CompressorRegistry.getDefaultInstance();
@VisibleForTesting
class FramerSink implements MessageFramer.Sink {
@ -302,15 +305,6 @@ public abstract class AbstractStream<IdT> implements Stream {
}
}
/**
* Set the decompressor for this stream. This may be called at most once. Typically this is set
* after the message encoding header is provided by the remote host, but before any messages are
* received.
*/
protected final void setDecompressor(Decompressor d) {
deframer.setDecompressor(d);
}
/**
* Looks up the decompressor by its message encoding name, and sets it for this stream.
* Decompressors are registered with {@link DecompressorRegistry#register}.
@ -318,11 +312,11 @@ public abstract class AbstractStream<IdT> implements Stream {
* @param messageEncoding the name of the encoding provided by the remote host
* @throws IllegalArgumentException if the provided message encoding cannot be found.
*/
public final void setDecompressor(String messageEncoding) {
protected final void setDecompressor(String messageEncoding) {
Decompressor d = decompressorRegistry.lookupDecompressor(messageEncoding);
checkArgument(d != null,
"Unable to find decompressor for message encoding %s", messageEncoding);
setDecompressor(d);
deframer.setDecompressor(d);
}
@Override
@ -331,10 +325,21 @@ public abstract class AbstractStream<IdT> implements Stream {
}
@Override
public void setCompressor(Compressor c) {
// TODO(carl-mastrangelo): check that headers haven't already been sent. I can't find where
// the client stream changes outbound phase correctly, so I am ignoring it.
framer.setCompressor(c);
public final void setCompressionRegistry(CompressorRegistry registry) {
compressorRegistry = checkNotNull(registry);
}
@Override
public final void pickCompressor(Iterable<String> messageEncodings) {
for (String messageEncoding : messageEncodings) {
Compressor c = compressorRegistry.lookupCompressor(messageEncoding);
if (c != null) {
// TODO(carl-mastrangelo): check that headers haven't already been sent. I can't find where
// the client stream changes outbound phase correctly, so I am ignoring it.
framer.setCompressor(c);
break;
}
}
}
/**

View File

@ -31,8 +31,11 @@
package io.grpc.internal;
import static com.google.common.collect.Iterables.addAll;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITER;
import static io.grpc.internal.GrpcUtil.AUTHORITY_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;
@ -50,6 +53,7 @@ import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
import io.grpc.Context;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
@ -58,6 +62,8 @@ import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import java.io.InputStream;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@ -82,7 +88,9 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
private final ClientTransportProvider clientTransportProvider;
private String userAgent;
private ScheduledExecutorService deadlineCancellationExecutor;
private Set<String> knownMessageEncodingRegistry;
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
ClientCallImpl(MethodDescriptor<ReqT, RespT> method, Executor executor,
CallOptions callOptions, ClientTransportProvider clientTransportProvider,
@ -129,9 +137,24 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
return this;
}
ClientCallImpl<ReqT, RespT> setCompressorRegistry(CompressorRegistry compressorRegistry) {
this.compressorRegistry = compressorRegistry;
return this;
}
/**
* Sets encodings known to be supported by the server. This set MUST be thread safe, and MAY be
* modified by any code as it learns about new supported encodings.
*/
ClientCallImpl<ReqT, RespT> setKnownMessageEncodingRegistry(Set<String> knownMessageEncodings) {
this.knownMessageEncodingRegistry = knownMessageEncodings;
return this;
}
@VisibleForTesting
static void prepareHeaders(Metadata headers, CallOptions callOptions, String userAgent,
DecompressorRegistry decompressorRegistry) {
Set<String> knownMessageEncodings, DecompressorRegistry decompressorRegistry,
CompressorRegistry compressorRegistry) {
// Hack to propagate authority. This should be properly pass to the transport.newStream
// somehow.
headers.removeAll(AUTHORITY_KEY);
@ -146,16 +169,19 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
}
headers.removeAll(MESSAGE_ENCODING_KEY);
Compressor compressor = callOptions.getCompressor();
if (compressor != null && compressor != Codec.Identity.NONE) {
headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
for (String messageEncoding : knownMessageEncodings) {
Compressor compressor = compressorRegistry.lookupCompressor(messageEncoding);
if (compressor != null && compressor != Codec.Identity.NONE) {
headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
break;
}
}
headers.removeAll(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY);
headers.removeAll(MESSAGE_ACCEPT_ENCODING_KEY);
if (!decompressorRegistry.getAdvertisedMessageEncodings().isEmpty()) {
String acceptEncoding =
Joiner.on(',').join(decompressorRegistry.getAdvertisedMessageEncodings());
headers.put(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY, acceptEncoding);
headers.put(MESSAGE_ACCEPT_ENCODING_KEY, acceptEncoding);
}
}
@ -175,7 +201,8 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
});
return;
}
prepareHeaders(headers, callOptions, userAgent, decompressorRegistry);
prepareHeaders(headers, callOptions, userAgent,
knownMessageEncodingRegistry, decompressorRegistry, compressorRegistry);
ClientStreamListener listener = new ClientStreamListenerImpl(observer);
ListenableFuture<ClientTransport> transportFuture = clientTransportProvider.get(callOptions);
@ -203,9 +230,9 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
}
stream.setDecompressionRegistry(decompressorRegistry);
Compressor compressor = callOptions.getCompressor();
if (compressor != null) {
stream.setCompressor(compressor);
stream.setCompressionRegistry(compressorRegistry);
if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
stream.pickCompressor(Collections.singleton(headers.get(MESSAGE_ENCODING_KEY)));
// TODO(carl-mastrangelo): move this to ClientCall.
stream.setMessageCompression(true);
}
@ -337,6 +364,13 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
@Override
public void headersRead(final Metadata headers) {
if (headers.containsKey(MESSAGE_ACCEPT_ENCODING_KEY)) {
// TODO(carl-mastrangelo): after the first time we contact the server, it almost certainly
// won't change. It might be possible to recover performance by not adding to the known
// encodings if it isn't empty.
String serverAcceptEncodings = headers.get(MESSAGE_ACCEPT_ENCODING_KEY);
addAll(knownMessageEncodingRegistry, ACCEPT_ENCODING_SPLITER.split(serverAcceptEncodings));
}
callExecutor.execute(new ContextRunnable(context) {
@Override
public final void runInContext() {

View File

@ -34,6 +34,7 @@ package io.grpc.internal;
import static com.google.common.base.Preconditions.checkState;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.Status;
@ -61,6 +62,9 @@ class DelayedStream implements ClientStream {
@GuardedBy("this")
private Compressor compressor;
@GuardedBy("this")
private Iterable<String> compressionMessageEncodings;
// Can be either a Decompressor or a String
@GuardedBy("this")
private Object decompressor;
@ -75,6 +79,8 @@ class DelayedStream implements ClientStream {
private int pendingFlowControlRequests;
@GuardedBy("this")
private boolean pendingFlush;
@GuardedBy("lock")
private CompressorRegistry compressionRegistry;
static final class PendingMessage {
final InputStream message;
@ -104,12 +110,15 @@ class DelayedStream implements ClientStream {
}
checkState(realStream == null, "Stream already created: %s", realStream);
realStream = stream;
if (compressor != null) {
realStream.setCompressor(compressor);
if (compressionMessageEncodings != null) {
realStream.pickCompressor(compressionMessageEncodings);
}
if (this.decompressionRegistry != null) {
realStream.setDecompressionRegistry(this.decompressionRegistry);
}
if (this.compressionRegistry != null) {
realStream.setCompressionRegistry(this.compressionRegistry);
}
for (PendingMessage message : pendingMessages) {
realStream.setMessageCompression(message.shouldBeCompressed);
realStream.writeMessage(message.message);
@ -206,11 +215,21 @@ class DelayedStream implements ClientStream {
}
@Override
public void setCompressor(Compressor c) {
public void pickCompressor(Iterable<String> messageEncodings) {
synchronized (this) {
compressor = c;
compressionMessageEncodings = messageEncodings;
if (realStream != null) {
realStream.setCompressor(c);
realStream.pickCompressor(messageEncodings);
}
}
}
@Override
public void setCompressionRegistry(CompressorRegistry registry) {
synchronized (this) {
this.compressionRegistry = registry;
if (realStream != null) {
realStream.setCompressionRegistry(registry);
}
}
}

View File

@ -37,6 +37,7 @@ import static io.grpc.Status.Code.DEADLINE_EXCEEDED;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.Metadata;
@ -156,6 +157,8 @@ public final class GrpcUtil {
public static final Set<Status.Code> CANCEL_REASONS =
EnumSet.of(CANCELLED, DEADLINE_EXCEEDED, Status.Code.INTERNAL, Status.Code.UNKNOWN);
public static final Splitter ACCEPT_ENCODING_SPLITER = Splitter.on(',').trimResults();
/**
* Maps HTTP error response status codes to transport codes.
*/

View File

@ -44,11 +44,9 @@ import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.EquivalentAddressGroup;
import io.grpc.ExperimentalApi;
import io.grpc.LoadBalancer;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
@ -62,9 +60,12 @@ import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@ -95,8 +96,21 @@ public final class ManagedChannelImpl extends ManagedChannel {
private final String userAgent;
private final Object lock = new Object();
/* Compression related */
/**
* When a client connects to a server, it does not know what encodings are supported. This set
* is the union of all accept-encoding headers the server has sent. It is used to pick an
* encoding when contacting the server again. One problem with the gRPC protocol is that if
* there is only one RPC made (perhaps streaming, or otherwise long lived) an encoding will not
* be selected. To combat this you can preflight a request to the server to fill in the mapping
* for the next one. A better solution is if you have prior knowledge that the server supports
* an encoding, and fill this structure before the request.
*/
private final Set<String> knownAcceptEncodingRegistry =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private final DecompressorRegistry decompressorRegistry =
DecompressorRegistry.getDefaultInstance();
private final CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
/**
* Executor that runs deadline timers for requests.
@ -126,8 +140,6 @@ public final class ManagedChannelImpl extends ManagedChannel {
@GuardedBy("lock")
private boolean terminated;
private volatile Compressor defaultCompressor;
private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
@Override
public ListenableFuture<ClientTransport> get(CallOptions callOptions) {
@ -220,21 +232,6 @@ public final class ManagedChannelImpl extends ManagedChannel {
target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors.toString() + ")" : ""));
}
/**
* Sets the default compression method for this Channel. By default, new calls will use the
* provided compressor. Each individual Call can override this by specifying it in CallOptions.
* If the remote host does not support the message encoding, the call will likely break. There
* is currently no provided way to discover what message encodings the remote host supports.
* @param c The compressor to use. If {@code null} no compression will by performed. This is
* equivalent to using {@code Codec.Identity.NONE}. If not null, the Compressor must be
* threadsafe.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/492")
public void setDefaultCompressor(@Nullable Compressor c) {
defaultCompressor = (c != null) ? c : Codec.Identity.NONE;
}
/**
* Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
* cancelled.
@ -311,10 +308,6 @@ public final class ManagedChannelImpl extends ManagedChannel {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions) {
boolean hasCodecOverride = callOptions.getCompressor() != null;
if (!hasCodecOverride && defaultCompressor != Codec.Identity.NONE) {
callOptions = callOptions.withCompressor(defaultCompressor);
}
return interceptorChannel.newCall(method, callOptions);
}
@ -334,7 +327,9 @@ public final class ManagedChannelImpl extends ManagedChannel {
transportProvider,
scheduledExecutor)
.setUserAgent(userAgent)
.setDecompressorRegistry(decompressorRegistry);
.setDecompressorRegistry(decompressorRegistry)
.setCompressorRegistry(compressorRegistry)
.setKnownMessageEncodingRegistry(knownAcceptEncodingRegistry);
}
@Override

View File

@ -31,7 +31,7 @@
package io.grpc.internal;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.Status;
@ -63,11 +63,6 @@ public class NoopClientStream implements ClientStream {
@Override
public void halfClose() {}
@Override
public void setCompressor(Compressor c) {
// very much a nop
}
@Override
public void setDecompressionRegistry(DecompressorRegistry registry) {}
@ -75,4 +70,10 @@ public class NoopClientStream implements ClientStream {
public void setMessageCompression(boolean enable) {
// noop
}
@Override
public void pickCompressor(Iterable<String> messageEncodings) {}
@Override
public void setCompressionRegistry(CompressorRegistry registry) {}
}

View File

@ -31,7 +31,7 @@
package io.grpc.internal;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import java.io.InputStream;
@ -80,10 +80,13 @@ public interface Stream {
boolean isReady();
/**
* Sets the default message encoder for messages on this stream.
* @param c the compressor
* Picks a compressor for for this stream. If no message encodings are acceptable, compression is
* not used.
*
* @param messageEncodings a group of message encoding names that the remote endpoint is known
* to support.
*/
void setCompressor(Compressor c);
void pickCompressor(Iterable<String> messageEncodings);
/**
* Enables per-message compression, if an encoding type has been negotiated. If no message
@ -92,7 +95,7 @@ public interface Stream {
void setMessageCompression(boolean enable);
/**
* Sets the decompressor registry to use when resolving {@link #setDecompressor(String)}. If
* Sets the decompressor registry to use when resolving {@code #setDecompressor(String)}. If
* unset, the default DecompressorRegistry will be used.
*
* @see DecompressorRegistry#getDefaultInstance()
@ -100,4 +103,14 @@ public interface Stream {
* @param registry the decompressors to use.
*/
void setDecompressionRegistry(DecompressorRegistry registry);
/**
* Sets the compressor registry to use when resolving {@link #pickCompressor}. If
* unset, the default CompressorRegistry will be used.
*
* @see CompressorRegistry#getDefaultInstance()
*
* @param registry the compressors to use.
*/
void setCompressionRegistry(CompressorRegistry registry);
}

View File

@ -42,7 +42,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.io.OutputStream;
import java.util.concurrent.TimeUnit;
/** Unit tests for {@link CallOptions}. */
@ -50,18 +49,15 @@ import java.util.concurrent.TimeUnit;
public class CallOptionsTest {
private String sampleAuthority = "authority";
private Long sampleDeadlineNanoTime = 1L;
private Compressor sampleCompressor = new Codec.Gzip();
private RequestKey sampleRequestKey = new RequestKey();
private CallOptions allSet = CallOptions.DEFAULT
.withAuthority(sampleAuthority)
.withDeadlineNanoTime(sampleDeadlineNanoTime)
.withCompressor(sampleCompressor)
.withRequestKey(sampleRequestKey);
@Test
public void defaultsAreAllNull() {
assertNull(CallOptions.DEFAULT.getDeadlineNanoTime());
assertNull(CallOptions.DEFAULT.getCompressor());
assertNull(CallOptions.DEFAULT.getAuthority());
assertNull(CallOptions.DEFAULT.getRequestKey());
}
@ -70,7 +66,6 @@ public class CallOptionsTest {
public void allWiths() {
assertSame(sampleAuthority, allSet.getAuthority());
assertSame(sampleDeadlineNanoTime, allSet.getDeadlineNanoTime());
assertSame(sampleCompressor, allSet.getCompressor());
assertSame(sampleRequestKey, allSet.getRequestKey());
}
@ -80,8 +75,6 @@ public class CallOptionsTest {
allSet.withAuthority("blah").withAuthority(sampleAuthority)));
assertTrue(equal(allSet,
allSet.withDeadlineNanoTime(314L).withDeadlineNanoTime(sampleDeadlineNanoTime)));
assertTrue(equal(allSet,
allSet.withCompressor(Codec.Identity.NONE).withCompressor(sampleCompressor)));
assertTrue(equal(allSet,
allSet.withRequestKey(new RequestKey()).withRequestKey(sampleRequestKey)));
}
@ -109,33 +102,16 @@ public class CallOptionsTest {
@Test
public void testToString() {
Compressor gzip = new Compressor() {
@Override
public String toString() {
return "GziP";
}
@Override
public String getMessageEncoding() {
throw new UnsupportedOperationException();
}
@Override
public OutputStream compress(OutputStream os) {
throw new UnsupportedOperationException();
}
};
assertEquals("CallOptions{deadlineNanoTime=null, compressor=null, authority=null}",
assertEquals("CallOptions{deadlineNanoTime=null, authority=null}",
CallOptions.DEFAULT.toString());
// Deadline makes it hard to check string for equality.
assertEquals("CallOptions{deadlineNanoTime=null, compressor=GziP, authority=authority}",
allSet.withCompressor(gzip).withDeadlineNanoTime(null).toString());
assertEquals("CallOptions{deadlineNanoTime=null, authority=authority}",
allSet.withDeadlineNanoTime(null).toString());
assertTrue(allSet.toString().contains("deadlineNanoTime=" + sampleDeadlineNanoTime + ","));
}
private static boolean equal(CallOptions o1, CallOptions o2) {
return Objects.equal(o1.getDeadlineNanoTime(), o2.getDeadlineNanoTime())
&& Objects.equal(o1.getCompressor(), o2.getCompressor())
&& Objects.equal(o1.getAuthority(), o2.getAuthority())
&& Objects.equal(o1.getRequestKey(), o2.getRequestKey());
}

View File

@ -31,6 +31,7 @@
package io.grpc.internal;
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@ -48,7 +49,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -58,6 +58,7 @@ import com.google.common.util.concurrent.SettableFuture;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Codec;
import io.grpc.CompressorRegistry;
import io.grpc.Context;
import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry;
@ -82,6 +83,7 @@ import org.mockito.MockitoAnnotations;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
@ -103,6 +105,8 @@ public class ClientCallImplTest {
private final ScheduledExecutorService deadlineCancellationExecutor =
Executors.newScheduledThreadPool(0);
private final Set<String> knownMessageEncodings = new HashSet<String>();
private final CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
private final DecompressorRegistry decompressorRegistry =
DecompressorRegistry.getDefaultInstance();
private final MethodDescriptor<Void, Void> method = MethodDescriptor.create(
@ -164,7 +168,9 @@ public class ClientCallImplTest {
CallOptions.DEFAULT,
provider,
deadlineCancellationExecutor)
.setDecompressorRegistry(decompressorRegistry);
.setDecompressorRegistry(decompressorRegistry)
.setCompressorRegistry(compressorRegistry)
.setKnownMessageEncodingRegistry(knownMessageEncodings);
call.start(new TestClientCallListener<Void>(), new Metadata());
@ -182,8 +188,8 @@ public class ClientCallImplTest {
public void prepareHeaders_authorityAdded() {
Metadata m = new Metadata();
CallOptions callOptions = CallOptions.DEFAULT.withAuthority("auth");
ClientCallImpl.prepareHeaders(
m, callOptions, "user agent", DecompressorRegistry.getDefaultInstance());
ClientCallImpl.prepareHeaders(m, callOptions, "user agent", knownMessageEncodings,
decompressorRegistry, compressorRegistry);
assertEquals(m.get(GrpcUtil.AUTHORITY_KEY), "auth");
}
@ -191,8 +197,8 @@ public class ClientCallImplTest {
@Test
public void prepareHeaders_userAgentAdded() {
Metadata m = new Metadata();
ClientCallImpl.prepareHeaders(
m, CallOptions.DEFAULT, "user agent", DecompressorRegistry.getDefaultInstance());
ClientCallImpl.prepareHeaders(m, CallOptions.DEFAULT, "user agent", knownMessageEncodings,
decompressorRegistry, compressorRegistry);
assertEquals(m.get(GrpcUtil.USER_AGENT_KEY), "user agent");
}
@ -200,9 +206,9 @@ public class ClientCallImplTest {
@Test
public void prepareHeaders_messageEncodingAdded() {
Metadata m = new Metadata();
CallOptions callOptions = CallOptions.DEFAULT.withCompressor(new Codec.Gzip());
ClientCallImpl.prepareHeaders(
m, callOptions, "user agent", DecompressorRegistry.getDefaultInstance());
knownMessageEncodings.add(new Codec.Gzip().getMessageEncoding());
ClientCallImpl.prepareHeaders(m, CallOptions.DEFAULT, "user agent", knownMessageEncodings,
decompressorRegistry, compressorRegistry);
assertEquals(m.get(GrpcUtil.MESSAGE_ENCODING_KEY), new Codec.Gzip().getMessageEncoding());
}
@ -210,9 +216,9 @@ public class ClientCallImplTest {
@Test
public void prepareHeaders_ignoreIdentityEncoding() {
Metadata m = new Metadata();
CallOptions callOptions = CallOptions.DEFAULT.withCompressor(Codec.Identity.NONE);
ClientCallImpl.prepareHeaders(
m, callOptions, "user agent", DecompressorRegistry.getDefaultInstance());
knownMessageEncodings.add(Codec.Identity.NONE.getMessageEncoding());
ClientCallImpl.prepareHeaders(m, CallOptions.DEFAULT, "user agent", knownMessageEncodings,
decompressorRegistry, compressorRegistry);
assertNull(m.get(GrpcUtil.MESSAGE_ENCODING_KEY));
}
@ -255,10 +261,11 @@ public class ClientCallImplTest {
}
}, false); // not advertised
ClientCallImpl.prepareHeaders(m, CallOptions.DEFAULT, "user agent", customRegistry);
ClientCallImpl.prepareHeaders(m, CallOptions.DEFAULT, "user agent", knownMessageEncodings,
customRegistry, compressorRegistry);
Iterable<String> acceptedEncodings =
Splitter.on(',').split(m.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY));
ACCEPT_ENCODING_SPLITER.split(m.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY));
// Order may be different, since decoder priorities have not yet been implemented.
assertEquals(ImmutableSet.of("b", "a"), ImmutableSet.copyOf(acceptedEncodings));
@ -272,8 +279,8 @@ public class ClientCallImplTest {
m.put(GrpcUtil.MESSAGE_ENCODING_KEY, "gzip");
m.put(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY, "gzip");
ClientCallImpl.prepareHeaders(
m, CallOptions.DEFAULT, null, DecompressorRegistry.newEmptyInstance());
ClientCallImpl.prepareHeaders(m, CallOptions.DEFAULT, null, knownMessageEncodings,
DecompressorRegistry.newEmptyInstance(), compressorRegistry);
assertNull(m.get(GrpcUtil.AUTHORITY_KEY));
assertNull(m.get(GrpcUtil.USER_AGENT_KEY));
@ -296,7 +303,9 @@ public class ClientCallImplTest {
CallOptions.DEFAULT,
provider,
deadlineCancellationExecutor)
.setDecompressorRegistry(decompressorRegistry);
.setDecompressorRegistry(decompressorRegistry)
.setCompressorRegistry(compressorRegistry)
.setKnownMessageEncodingRegistry(knownMessageEncodings);
Context.ROOT.attach();
@ -371,7 +380,9 @@ public class ClientCallImplTest {
CallOptions.DEFAULT,
provider,
deadlineCancellationExecutor)
.setDecompressorRegistry(decompressorRegistry);
.setDecompressorRegistry(decompressorRegistry)
.setCompressorRegistry(compressorRegistry)
.setKnownMessageEncodingRegistry(knownMessageEncodings);
previous.attach();

View File

@ -36,12 +36,9 @@ import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import io.grpc.Codec;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.IntegerMarshaller;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import org.junit.Before;
@ -71,10 +68,6 @@ public class DelayedStreamTest {
@Mock private ClientStream realStream;
@Captor private ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
private DelayedStream stream;
private Metadata headers = new Metadata();
private MethodDescriptor<Integer, Integer> method = MethodDescriptor.create(
MethodType.UNARY, "service/method", new IntegerMarshaller(), new IntegerMarshaller());
@Before
public void setUp() {
@ -85,10 +78,10 @@ public class DelayedStreamTest {
@Test
public void setStream_sendsAllMessages() {
stream.setCompressor(Codec.Identity.NONE);
DecompressorRegistry registry = DecompressorRegistry.newEmptyInstance();
stream.setDecompressionRegistry(registry);
DecompressorRegistry decompressors = DecompressorRegistry.newEmptyInstance();
CompressorRegistry compressors = CompressorRegistry.newEmptyInstance();
stream.setDecompressionRegistry(decompressors);
stream.setCompressionRegistry(compressors);
stream.setMessageCompression(true);
InputStream message = new ByteArrayInputStream(new byte[]{'a'});
@ -98,9 +91,8 @@ public class DelayedStreamTest {
stream.setStream(realStream);
verify(realStream).setCompressor(Codec.Identity.NONE);
verify(realStream).setDecompressionRegistry(registry);
verify(realStream).setDecompressionRegistry(decompressors);
verify(realStream).setCompressionRegistry(compressors);
// Verify that the order was correct, even though they should be interleaved with the
// writeMessage calls

View File

@ -54,6 +54,7 @@ import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.IntegerMarshaller;
import io.grpc.LoadBalancer;
@ -194,6 +195,7 @@ public class ManagedChannelImplTest {
verify(mockTransport, timeout(1000))
.newStream(same(method), same(headers), streamListenerCaptor.capture());
verify(mockStream).setDecompressionRegistry(isA(DecompressorRegistry.class));
verify(mockStream).setCompressionRegistry(isA(CompressorRegistry.class));
ClientStreamListener streamListener = streamListenerCaptor.getValue();
// Second call

View File

@ -31,7 +31,6 @@
package io.grpc.examples.experimental;
import io.grpc.Codec;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.examples.helloworld.GreeterGrpc;
@ -61,7 +60,7 @@ public class CompressingHelloWorldClient {
channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext(true)
.build();
blockingStub = GreeterGrpc.newBlockingStub(channel).withCompressor(new Codec.Gzip());
blockingStub = GreeterGrpc.newBlockingStub(channel);
}
public void shutdown() throws InterruptedException {

View File

@ -35,8 +35,6 @@ import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.Compressor;
import io.grpc.ExperimentalApi;
import java.util.concurrent.TimeUnit;
@ -125,14 +123,6 @@ public abstract class AbstractStub<S extends AbstractStub<S>> {
return build(newChannel, callOptions);
}
/**
* Returns a new stub that uses the given compressor.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/492")
public final S withCompressor(Compressor c) {
return build(channel, callOptions.withCompressor(c));
}
/**
* Returns a new stub that has the given interceptors attached to the underlying channel.
*/