From 3158f91e7538424df0c1c2961fed507e4bf864f6 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Sat, 11 May 2024 10:10:02 -0700 Subject: [PATCH] rls: Guarantee backoff will update RLS picker Previously, picker was likely null if entering backoff soon after start-up. This prevented the picker from being updated and directing queued RPCs to the fallback. It would work for new RPCs if RLS returned extremely rapidly; both ManagedChannelImpl and DelayedClientTransport do a pick before enqueuing so the ManagedChannelImpl pick could request from RLS and DelayedClientTransport could use the response. So the test uses a delay to purposefully avoid that unlikely-in-real-life case. Creating a resolving OOB channel for InProcess doesn't actually change the destination from the parent, because InProcess uses directaddress. Thus the fakeRlsServiceImpl is now being added to the fake backend server, because the same server is used for RLS within the test. b/333185213 --- .../java/io/grpc/rls/CachingRlsLbClient.java | 23 +++---- .../io/grpc/rls/ChildLoadBalancerHelper.java | 4 ++ .../io/grpc/rls/LbPolicyConfiguration.java | 4 ++ .../java/io/grpc/rls/RlsLoadBalancerTest.java | 60 ++++++++++++++++++- 4 files changed, 76 insertions(+), 15 deletions(-) diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index fcfd5933c8..2c94e9e0a6 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -248,6 +248,12 @@ final class CachingRlsLbClient { logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created"); } + void init() { + synchronized (lock) { + refCountedChildPolicyWrapperFactory.init(); + } + } + /** * Convert the status to UNAVAILBLE and enhance the error message. * @param status status as provided by server @@ -385,7 +391,7 @@ final class CachingRlsLbClient { } catch (Exception e) { createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy); // Cache updated. updateBalancingState() to reattempt picks - helper.propagateRlsError(); + helper.triggerPendingRpcProcessing(); } } } @@ -457,19 +463,8 @@ final class CachingRlsLbClient { super.updateBalancingState(newState, newPicker); } - void propagateRlsError() { - getSynchronizationContext().execute(new Runnable() { - @Override - public void run() { - if (picker != null) { - // Refresh the channel state and let pending RPCs reprocess the picker. - updateBalancingState(state, picker); - } - } - }); - } - void triggerPendingRpcProcessing() { + checkState(state != null, "updateBalancingState hasn't yet been called"); helper.getSynchronizationContext().execute( () -> super.updateBalancingState(state, picker)); } @@ -842,7 +837,7 @@ final class CachingRlsLbClient { CachingRlsLbClient build() { CachingRlsLbClient client = new CachingRlsLbClient(this); - helper.updateBalancingState(ConnectivityState.CONNECTING, client.rlsPicker); + client.init(); return client; } } diff --git a/rls/src/main/java/io/grpc/rls/ChildLoadBalancerHelper.java b/rls/src/main/java/io/grpc/rls/ChildLoadBalancerHelper.java index 3131aba755..7a5d5dcc64 100644 --- a/rls/src/main/java/io/grpc/rls/ChildLoadBalancerHelper.java +++ b/rls/src/main/java/io/grpc/rls/ChildLoadBalancerHelper.java @@ -77,6 +77,10 @@ final class ChildLoadBalancerHelper extends ForwardingLoadBalancerHelper { this.picker = checkNotNull(picker, "picker"); } + void init() { + helper.updateBalancingState(ConnectivityState.CONNECTING, picker); + } + ChildLoadBalancerHelper forTarget(String target) { return new ChildLoadBalancerHelper(target, helper, subchannelStateManager, picker); } diff --git a/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java b/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java index be067732ba..4d6ceed923 100644 --- a/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java +++ b/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java @@ -225,6 +225,10 @@ final class LbPolicyConfiguration { this.childLbStatusListener = checkNotNull(childLbStatusListener, "childLbStatusListener"); } + void init() { + childLbHelperProvider.init(); + } + ChildPolicyWrapper createOrGet(String target) { // TODO(creamsoup) check if the target is valid or not RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target); diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index ce2926919b..f3986cb89d 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -134,7 +134,8 @@ public class RlsLoadBalancerTest { private final FakeHelper helperDelegate = new FakeHelper(); private final Helper helper = mock(Helper.class, AdditionalAnswers.delegatesTo(helperDelegate)); - private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl(); + private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl( + fakeClock.getScheduledExecutorService()); private final Deque subchannels = new LinkedList<>(); private final FakeThrottler fakeThrottler = new FakeThrottler(); private final String channelTarget = "channelTarget"; @@ -296,6 +297,38 @@ public class RlsLoadBalancerTest { verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail"); } + @Test + public void fallbackWithDelay_succeeds() throws Exception { + fakeRlsServerImpl.setResponseDelay(100, TimeUnit.MILLISECONDS); + grpcCleanupRule.register( + InProcessServerBuilder.forName("fake-bigtable.googleapis.com") + .addService(ServerServiceDefinition.builder("com.google") + .addMethod(fakeSearchMethod, (call, headers) -> { + call.sendHeaders(new Metadata()); + call.sendMessage(null); + call.close(Status.OK, new Metadata()); + return new ServerCall.Listener() {}; + }) + .build()) + .addService(fakeRlsServerImpl) + .directExecutor() + .build() + .start()); + ManagedChannel channel = grpcCleanupRule.register( + InProcessChannelBuilder.forName("fake-bigtable.googleapis.com") + .defaultServiceConfig(parseJson(getServiceConfigJsonStr())) + .directExecutor() + .build()); + + StreamRecorder recorder = StreamRecorder.create(); + StreamObserver requestObserver = ClientCalls.asyncClientStreamingCall( + channel.newCall(fakeSearchMethod, CallOptions.DEFAULT), recorder); + requestObserver.onCompleted(); + fakeClock.forwardTime(100, TimeUnit.MILLISECONDS); + assertThat(recorder.awaitCompletion(10, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.getError()).isNull(); + } + @Test public void metricsWithRealChannel() throws Exception { grpcCleanupRule.register( @@ -308,6 +341,7 @@ public class RlsLoadBalancerTest { return new ServerCall.Listener() {}; }) .build()) + .addService(fakeRlsServerImpl) .directExecutor() .build() .start()); @@ -761,17 +795,41 @@ public class RlsLoadBalancerTest { private static final Converter RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse(); + private final ScheduledExecutorService scheduler; + private long delay; + private TimeUnit delayUnit; + + public FakeRlsServerImpl(ScheduledExecutorService scheduler) { + this.scheduler = scheduler; + } + private Map lookupTable = ImmutableMap.of(); private void setLookupTable(Map lookupTable) { this.lookupTable = checkNotNull(lookupTable, "lookupTable"); } + void setResponseDelay(long delay, TimeUnit unit) { + this.delay = delay; + this.delayUnit = unit; + } + @Override + @SuppressWarnings("FutureReturnValueIgnored") public void routeLookup(io.grpc.lookup.v1.RouteLookupRequest request, StreamObserver responseObserver) { RouteLookupResponse response = lookupTable.get(REQUEST_CONVERTER.convert(request)); + Runnable sendResponse = () -> sendResponse(response, responseObserver); + if (delay != 0) { + scheduler.schedule(sendResponse, delay, delayUnit); + } else { + sendResponse.run(); + } + } + + private void sendResponse(RouteLookupResponse response, + StreamObserver responseObserver) { if (response == null) { responseObserver.onError(new RuntimeException("not found")); } else {