Eliminate ForwardingChannel and switch all use cases to client interceptors.

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=79360027
This commit is contained in:
zhangkun 2014-11-06 12:31:35 -08:00 committed by Eric Anderson
parent 48e734d7e3
commit 776ff86687
4 changed files with 45 additions and 113 deletions

View File

@ -3,16 +3,17 @@ package com.google.net.stubby.auth;
import com.google.api.client.auth.oauth2.Credential;
import com.google.net.stubby.Call;
import com.google.net.stubby.Channel;
import com.google.net.stubby.ClientInterceptor;
import com.google.net.stubby.ClientInterceptors.ForwardingCall;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.context.ForwardingChannel;
import java.util.concurrent.Executor;
import javax.inject.Provider;
/** Channel wrapper that authenticates all calls with OAuth2. */
public class OAuth2ChannelInterceptor extends ForwardingChannel {
/** Client interceptor that authenticates all calls with OAuth2. */
public class OAuth2ChannelInterceptor implements ClientInterceptor {
private static final Metadata.Key<String> AUTHORIZATION =
Metadata.Key.of("Authorization", Metadata.STRING_MARSHALLER);
@ -25,16 +26,16 @@ public class OAuth2ChannelInterceptor extends ForwardingChannel {
}
};
public OAuth2ChannelInterceptor(Channel delegate, Credential credential, Executor executor) {
super(delegate);
public OAuth2ChannelInterceptor(Credential credential, Executor executor) {
this.accessTokenProvider = new OAuth2AccessTokenProvider(credential, executor);
}
@Override
public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
Channel next) {
// TODO(user): If the call fails for Auth reasons, this does not properly propagate info that
// would be in WWW-Authenticate, because it does not yet have access to the header.
return new ForwardingCall<ReqT, RespT>(delegate.newCall(method)) {
return new ForwardingCall<ReqT, RespT>(next.newCall(method)) {
@Override
public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
headers.put(AUTHORIZATION, authorizationHeaderProvider.get());

View File

@ -1,84 +0,0 @@
package com.google.net.stubby.context;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.Call;
import com.google.net.stubby.Channel;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import javax.annotation.Nullable;
/**
* A {@link Channel} which forwards all of it's methods to another {@link Channel}. Implementations
* should override methods and make use of {@link ForwardingListener} and {@link ForwardingCall}
* to augment the behavior of the underlying {@link Channel}.
*/
public abstract class ForwardingChannel implements Channel {
protected final Channel delegate;
public ForwardingChannel(Channel channel) {
this.delegate = channel;
}
/**
* A {@link Call} which forwards all of it's methods to another {@link Call}.
*/
public static class ForwardingCall<RequestT,ResponseT> extends Call<RequestT,ResponseT> {
protected final Call<RequestT, ResponseT> delegate;
public ForwardingCall(Call<RequestT, ResponseT> delegate) {
this.delegate = delegate;
}
@Override
public void start(Listener<ResponseT> responseListener, Metadata.Headers headers) {
this.delegate.start(responseListener, headers);
}
@Override
public void cancel() {
this.delegate.cancel();
}
@Override
public void halfClose() {
this.delegate.halfClose();
}
@Override
public void sendPayload(RequestT payload) {
this.delegate.sendPayload(payload);
}
}
/**
* A {@link com.google.net.stubby.Call.Listener} which forwards all of its methods to another
* {@link com.google.net.stubby.Call.Listener}.
*/
public static class ForwardingListener<T> extends Call.Listener<T> {
Call.Listener<T> delegate;
public ForwardingListener(Call.Listener<T> delegate) {
this.delegate = delegate;
}
@Override
public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
return delegate.onHeaders(headers);
}
@Override
public ListenableFuture<Void> onPayload(T payload) {
return delegate.onPayload(payload);
}
@Override
public void onClose(Status status, Metadata.Trailers trailers) {
delegate.onClose(status, trailers);
}
}
}

View File

@ -2,8 +2,12 @@ package com.google.net.stubby.stub;
import com.google.common.collect.Maps;
import com.google.net.stubby.Channel;
import com.google.net.stubby.ClientInterceptor;
import com.google.net.stubby.ClientInterceptors;
import com.google.net.stubby.MethodDescriptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -52,6 +56,7 @@ public abstract class AbstractStub<S extends AbstractStub<?, ?>,
public class StubConfigBuilder {
private final Map<String, MethodDescriptor<?, ?>> methodMap;
private final List<ClientInterceptor> interceptors = new ArrayList<ClientInterceptor>();
private Channel stubChannel;
private StubConfigBuilder() {
@ -80,11 +85,20 @@ public abstract class AbstractStub<S extends AbstractStub<?, ?>,
return this;
}
/**
* Adds a client interceptor to be attached to the channel.
*/
public StubConfigBuilder addInterceptor(ClientInterceptor interceptor) {
interceptors.add(interceptor);
return this;
}
/**
* Create a new stub configuration
*/
public S build() {
return AbstractStub.this.build(stubChannel, config.build(methodMap));
return AbstractStub.this.build(ClientInterceptors.intercept(stubChannel, interceptors),
config.build(methodMap));
}
}
}

View File

@ -3,10 +3,12 @@ package com.google.net.stubby.stub;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Call;
import com.google.net.stubby.Channel;
import com.google.net.stubby.ClientInterceptor;
import com.google.net.stubby.ClientInterceptors.ForwardingCall;
import com.google.net.stubby.ClientInterceptors.ForwardingListener;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.Status;
import com.google.net.stubby.context.ForwardingChannel;
import java.util.concurrent.atomic.AtomicReference;
@ -25,27 +27,26 @@ public class MetadataUtils {
public static <T extends AbstractStub> T attachHeaders(
T stub,
final Metadata.Headers extraHeaders) {
return (T) stub.configureNewStub().setChannel(attachHeaders(stub.getChannel(), extraHeaders))
.build();
return (T) stub.configureNewStub().addInterceptor(
newAttachHeadersInterceptor(extraHeaders)).build();
}
/**
* Attach a set of request headers to a channel.
* Return a client interceptor that attaches a set of headers to requests.
*
* @param channel to channel to intercept.
* @param extraHeaders the headers to be passed by each call on the returned stub.
* @return an implementation of the channel with extraHeaders bound to each call.
* @param extraHeaders the headers to be passed by each call that is processed by the returned
* interceptor
*/
@SuppressWarnings("unchecked")
public static Channel attachHeaders(Channel channel, final Metadata.Headers extraHeaders) {
return new ForwardingChannel(channel) {
public static ClientInterceptor newAttachHeadersInterceptor(final Metadata.Headers extraHeaders) {
return new ClientInterceptor() {
@Override
public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
return new ForwardingCall<ReqT, RespT>(delegate.newCall(method)) {
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
Channel next) {
return new ForwardingCall<ReqT, RespT>(next.newCall(method)) {
@Override
public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
headers.merge(extraHeaders);
delegate.start(responseListener, headers);
super.start(responseListener, headers);
}
};
}
@ -64,9 +65,8 @@ public class MetadataUtils {
T stub,
AtomicReference<Metadata.Headers> headersCapture,
AtomicReference<Metadata.Trailers> trailersCapture) {
return (T) stub.configureNewStub().setChannel(
captureMetadata(stub.getChannel(), headersCapture, trailersCapture))
.build();
return (T) stub.configureNewStub().addInterceptor(
newCaptureMetadataInterceptor(headersCapture, trailersCapture)).build();
}
/**
@ -78,18 +78,19 @@ public class MetadataUtils {
* @return an implementation of the channel with captures installed.
*/
@SuppressWarnings("unchecked")
public static Channel captureMetadata(Channel channel,
public static ClientInterceptor newCaptureMetadataInterceptor(
final AtomicReference<Metadata.Headers> headersCapture,
final AtomicReference<Metadata.Trailers> trailersCapture) {
return new ForwardingChannel(channel) {
return new ClientInterceptor() {
@Override
public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
return new ForwardingCall<ReqT, RespT>(delegate.newCall(method)) {
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
Channel next) {
return new ForwardingCall<ReqT, RespT>(next.newCall(method)) {
@Override
public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
headersCapture.set(null);
trailersCapture.set(null);
delegate.start(new ForwardingListener<RespT>(responseListener) {
super.start(new ForwardingListener<RespT>(responseListener) {
@Override
public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
headersCapture.set(headers);