core: abstract channel builder to accept LoadBalancer2 (#2583)

If a LoadBalancer2 is passed in, the builder will create ManagedChannelImpl2 instead of ManagedChannelImpl. This allows us to test the LBv2 classes on a large scale.
This commit is contained in:
Kun Zhang 2017-01-10 15:30:12 -08:00 committed by GitHub
parent a3a5420922
commit d17a7b5bd4
4 changed files with 131 additions and 19 deletions

View File

@ -46,7 +46,6 @@ import io.grpc.benchmarks.proto.Control;
import io.grpc.benchmarks.proto.Messages;
import io.grpc.benchmarks.proto.Payloads;
import io.grpc.benchmarks.proto.Stats;
import io.grpc.internal.ManagedChannelImpl;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
import io.netty.buffer.ByteBuf;
@ -96,7 +95,7 @@ class LoadClient {
log.log(Level.INFO, "Client Config \n" + config.toString());
this.config = config;
// Create the channels
channels = new ManagedChannelImpl[config.getClientChannels()];
channels = new ManagedChannel[config.getClientChannels()];
for (int i = 0; i < config.getClientChannels(); i++) {
channels[i] =
Utils.newClientChannel(

View File

@ -46,6 +46,8 @@ import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer2;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolver;
import io.grpc.NameResolverProvider;
@ -94,6 +96,7 @@ public abstract class AbstractManagedChannelImplBuilder
@Nullable
private Executor executor;
private final List<ClientInterceptor> interceptors = new ArrayList<ClientInterceptor>();
private final String target;
@ -113,6 +116,9 @@ public abstract class AbstractManagedChannelImplBuilder
@Nullable
private LoadBalancer.Factory loadBalancerFactory;
@Nullable
private LoadBalancer2.Factory loadBalancer2Factory;
@Nullable
private DecompressorRegistry decompressorRegistry;
@ -204,6 +210,17 @@ public abstract class AbstractManagedChannelImplBuilder
return thisT();
}
/**
* DO NOT CALL THIS, as its argument type will soon be renamed.
*/
public final T loadBalancerFactory(LoadBalancer2.Factory loadBalancerFactory) {
Preconditions.checkState(directServerAddress == null,
"directServerAddress is set (%s), which forbids the use of LoadBalancerFactory",
directServerAddress);
this.loadBalancer2Factory = loadBalancerFactory;
return thisT();
}
@Override
public final T decompressorRegistry(DecompressorRegistry registry) {
this.decompressorRegistry = registry;
@ -266,7 +283,7 @@ public abstract class AbstractManagedChannelImplBuilder
}
@Override
public ManagedChannelImpl build() {
public ManagedChannel build() {
ClientTransportFactory transportFactory = buildTransportFactory();
if (authorityOverride != null) {
transportFactory = new AuthorityOverridingTransportFactory(
@ -279,6 +296,24 @@ public abstract class AbstractManagedChannelImplBuilder
// getResource(), then this shouldn't be a problem unless called on the UI thread.
nameResolverFactory = NameResolverProvider.asFactory();
}
if (loadBalancer2Factory != null) {
return new ManagedChannelImpl2(
target,
// TODO(carl-mastrangelo): Allow clients to pass this in
new ExponentialBackoffPolicy.Provider(),
nameResolverFactory,
getNameResolverParams(),
loadBalancer2Factory,
transportFactory,
firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
getExecutorPool(executor),
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
userAgent, interceptors, firstNonNull(statsFactory,
firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
} else {
return new ManagedChannelImpl(
target,
// TODO(carl-mastrangelo): Allow clients to pass this in
@ -294,6 +329,7 @@ public abstract class AbstractManagedChannelImplBuilder
firstNonNull(statsFactory,
firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
}
}
/**
* Subclasses should override this method to provide the {@link ClientTransportFactory}
@ -311,6 +347,24 @@ public abstract class AbstractManagedChannelImplBuilder
return Attributes.EMPTY;
}
private static ObjectPool<? extends Executor> getExecutorPool(final @Nullable Executor executor) {
if (executor != null) {
return new ObjectPool<Executor>() {
@Override
public Executor getObject() {
return executor;
}
@Override
public Executor returnObject(Object returned) {
return null;
}
};
} else {
return SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
}
}
private static class AuthorityOverridingTransportFactory implements ClientTransportFactory {
final ClientTransportFactory factory;
final String authorityOverride;

View File

@ -0,0 +1,59 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
/**
* An ObjectPool backed by a {@link SharedResourceHolder.Resource}.
*/
public final class SharedResourcePool<T> implements ObjectPool<T> {
private final SharedResourceHolder.Resource<T> resource;
private SharedResourcePool(SharedResourceHolder.Resource<T> resource) {
this.resource = resource;
}
public static <T> SharedResourcePool<T> forResource(SharedResourceHolder.Resource<T> resource) {
return new SharedResourcePool<T>(resource);
}
@Override
public T getObject() {
return SharedResourceHolder.get(resource);
}
@Override
@SuppressWarnings("unchecked")
public T returnObject(Object object) {
SharedResourceHolder.release(resource, (T) object);
return null;
}
}

View File

@ -43,6 +43,7 @@ import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
@ -52,7 +53,6 @@ import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.ManagedChannelImpl;
import io.grpc.internal.ServerImpl;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
@ -84,7 +84,7 @@ public class CascadingTest {
@Mock
TestServiceGrpc.TestServiceImplBase service;
private ManagedChannelImpl channel;
private ManagedChannel channel;
private ServerImpl server;
private CountDownLatch observedCancellations;
private CountDownLatch receivedCancellations;