diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index fcfd5933c8..2c94e9e0a6 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -248,6 +248,12 @@ final class CachingRlsLbClient { logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created"); } + void init() { + synchronized (lock) { + refCountedChildPolicyWrapperFactory.init(); + } + } + /** * Convert the status to UNAVAILBLE and enhance the error message. * @param status status as provided by server @@ -385,7 +391,7 @@ final class CachingRlsLbClient { } catch (Exception e) { createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy); // Cache updated. updateBalancingState() to reattempt picks - helper.propagateRlsError(); + helper.triggerPendingRpcProcessing(); } } } @@ -457,19 +463,8 @@ final class CachingRlsLbClient { super.updateBalancingState(newState, newPicker); } - void propagateRlsError() { - getSynchronizationContext().execute(new Runnable() { - @Override - public void run() { - if (picker != null) { - // Refresh the channel state and let pending RPCs reprocess the picker. - updateBalancingState(state, picker); - } - } - }); - } - void triggerPendingRpcProcessing() { + checkState(state != null, "updateBalancingState hasn't yet been called"); helper.getSynchronizationContext().execute( () -> super.updateBalancingState(state, picker)); } @@ -842,7 +837,7 @@ final class CachingRlsLbClient { CachingRlsLbClient build() { CachingRlsLbClient client = new CachingRlsLbClient(this); - helper.updateBalancingState(ConnectivityState.CONNECTING, client.rlsPicker); + client.init(); return client; } } diff --git a/rls/src/main/java/io/grpc/rls/ChildLoadBalancerHelper.java b/rls/src/main/java/io/grpc/rls/ChildLoadBalancerHelper.java index 3131aba755..7a5d5dcc64 100644 --- a/rls/src/main/java/io/grpc/rls/ChildLoadBalancerHelper.java +++ b/rls/src/main/java/io/grpc/rls/ChildLoadBalancerHelper.java @@ -77,6 +77,10 @@ final class ChildLoadBalancerHelper extends ForwardingLoadBalancerHelper { this.picker = checkNotNull(picker, "picker"); } + void init() { + helper.updateBalancingState(ConnectivityState.CONNECTING, picker); + } + ChildLoadBalancerHelper forTarget(String target) { return new ChildLoadBalancerHelper(target, helper, subchannelStateManager, picker); } diff --git a/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java b/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java index be067732ba..4d6ceed923 100644 --- a/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java +++ b/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java @@ -225,6 +225,10 @@ final class LbPolicyConfiguration { this.childLbStatusListener = checkNotNull(childLbStatusListener, "childLbStatusListener"); } + void init() { + childLbHelperProvider.init(); + } + ChildPolicyWrapper createOrGet(String target) { // TODO(creamsoup) check if the target is valid or not RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target); diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index ce2926919b..f3986cb89d 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -134,7 +134,8 @@ public class RlsLoadBalancerTest { private final FakeHelper helperDelegate = new FakeHelper(); private final Helper helper = mock(Helper.class, AdditionalAnswers.delegatesTo(helperDelegate)); - private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl(); + private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl( + fakeClock.getScheduledExecutorService()); private final Deque subchannels = new LinkedList<>(); private final FakeThrottler fakeThrottler = new FakeThrottler(); private final String channelTarget = "channelTarget"; @@ -296,6 +297,38 @@ public class RlsLoadBalancerTest { verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail"); } + @Test + public void fallbackWithDelay_succeeds() throws Exception { + fakeRlsServerImpl.setResponseDelay(100, TimeUnit.MILLISECONDS); + grpcCleanupRule.register( + InProcessServerBuilder.forName("fake-bigtable.googleapis.com") + .addService(ServerServiceDefinition.builder("com.google") + .addMethod(fakeSearchMethod, (call, headers) -> { + call.sendHeaders(new Metadata()); + call.sendMessage(null); + call.close(Status.OK, new Metadata()); + return new ServerCall.Listener() {}; + }) + .build()) + .addService(fakeRlsServerImpl) + .directExecutor() + .build() + .start()); + ManagedChannel channel = grpcCleanupRule.register( + InProcessChannelBuilder.forName("fake-bigtable.googleapis.com") + .defaultServiceConfig(parseJson(getServiceConfigJsonStr())) + .directExecutor() + .build()); + + StreamRecorder recorder = StreamRecorder.create(); + StreamObserver requestObserver = ClientCalls.asyncClientStreamingCall( + channel.newCall(fakeSearchMethod, CallOptions.DEFAULT), recorder); + requestObserver.onCompleted(); + fakeClock.forwardTime(100, TimeUnit.MILLISECONDS); + assertThat(recorder.awaitCompletion(10, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.getError()).isNull(); + } + @Test public void metricsWithRealChannel() throws Exception { grpcCleanupRule.register( @@ -308,6 +341,7 @@ public class RlsLoadBalancerTest { return new ServerCall.Listener() {}; }) .build()) + .addService(fakeRlsServerImpl) .directExecutor() .build() .start()); @@ -761,17 +795,41 @@ public class RlsLoadBalancerTest { private static final Converter RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse(); + private final ScheduledExecutorService scheduler; + private long delay; + private TimeUnit delayUnit; + + public FakeRlsServerImpl(ScheduledExecutorService scheduler) { + this.scheduler = scheduler; + } + private Map lookupTable = ImmutableMap.of(); private void setLookupTable(Map lookupTable) { this.lookupTable = checkNotNull(lookupTable, "lookupTable"); } + void setResponseDelay(long delay, TimeUnit unit) { + this.delay = delay; + this.delayUnit = unit; + } + @Override + @SuppressWarnings("FutureReturnValueIgnored") public void routeLookup(io.grpc.lookup.v1.RouteLookupRequest request, StreamObserver responseObserver) { RouteLookupResponse response = lookupTable.get(REQUEST_CONVERTER.convert(request)); + Runnable sendResponse = () -> sendResponse(response, responseObserver); + if (delay != 0) { + scheduler.schedule(sendResponse, delay, delayUnit); + } else { + sendResponse.run(); + } + } + + private void sendResponse(RouteLookupResponse response, + StreamObserver responseObserver) { if (response == null) { responseObserver.onError(new RuntimeException("not found")); } else {