mirror of https://github.com/grpc/grpc-java.git
rls: Guarantee backoff will update RLS picker
Previously, picker was likely null if entering backoff soon after start-up. This prevented the picker from being updated and directing queued RPCs to the fallback. It would work for new RPCs if RLS returned extremely rapidly; both ManagedChannelImpl and DelayedClientTransport do a pick before enqueuing so the ManagedChannelImpl pick could request from RLS and DelayedClientTransport could use the response. So the test uses a delay to purposefully avoid that unlikely-in-real-life case. Creating a resolving OOB channel for InProcess doesn't actually change the destination from the parent, because InProcess uses directaddress. Thus the fakeRlsServiceImpl is now being added to the fake backend server, because the same server is used for RLS within the test. b/333185213
This commit is contained in:
parent
80f872e3a6
commit
3158f91e75
|
@ -248,6 +248,12 @@ final class CachingRlsLbClient {
|
|||
logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
|
||||
}
|
||||
|
||||
void init() {
|
||||
synchronized (lock) {
|
||||
refCountedChildPolicyWrapperFactory.init();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the status to UNAVAILBLE and enhance the error message.
|
||||
* @param status status as provided by server
|
||||
|
@ -385,7 +391,7 @@ final class CachingRlsLbClient {
|
|||
} catch (Exception e) {
|
||||
createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy);
|
||||
// Cache updated. updateBalancingState() to reattempt picks
|
||||
helper.propagateRlsError();
|
||||
helper.triggerPendingRpcProcessing();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -457,19 +463,8 @@ final class CachingRlsLbClient {
|
|||
super.updateBalancingState(newState, newPicker);
|
||||
}
|
||||
|
||||
void propagateRlsError() {
|
||||
getSynchronizationContext().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (picker != null) {
|
||||
// Refresh the channel state and let pending RPCs reprocess the picker.
|
||||
updateBalancingState(state, picker);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void triggerPendingRpcProcessing() {
|
||||
checkState(state != null, "updateBalancingState hasn't yet been called");
|
||||
helper.getSynchronizationContext().execute(
|
||||
() -> super.updateBalancingState(state, picker));
|
||||
}
|
||||
|
@ -842,7 +837,7 @@ final class CachingRlsLbClient {
|
|||
|
||||
CachingRlsLbClient build() {
|
||||
CachingRlsLbClient client = new CachingRlsLbClient(this);
|
||||
helper.updateBalancingState(ConnectivityState.CONNECTING, client.rlsPicker);
|
||||
client.init();
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,6 +77,10 @@ final class ChildLoadBalancerHelper extends ForwardingLoadBalancerHelper {
|
|||
this.picker = checkNotNull(picker, "picker");
|
||||
}
|
||||
|
||||
void init() {
|
||||
helper.updateBalancingState(ConnectivityState.CONNECTING, picker);
|
||||
}
|
||||
|
||||
ChildLoadBalancerHelper forTarget(String target) {
|
||||
return new ChildLoadBalancerHelper(target, helper, subchannelStateManager, picker);
|
||||
}
|
||||
|
|
|
@ -225,6 +225,10 @@ final class LbPolicyConfiguration {
|
|||
this.childLbStatusListener = checkNotNull(childLbStatusListener, "childLbStatusListener");
|
||||
}
|
||||
|
||||
void init() {
|
||||
childLbHelperProvider.init();
|
||||
}
|
||||
|
||||
ChildPolicyWrapper createOrGet(String target) {
|
||||
// TODO(creamsoup) check if the target is valid or not
|
||||
RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target);
|
||||
|
|
|
@ -134,7 +134,8 @@ public class RlsLoadBalancerTest {
|
|||
private final FakeHelper helperDelegate = new FakeHelper();
|
||||
private final Helper helper =
|
||||
mock(Helper.class, AdditionalAnswers.delegatesTo(helperDelegate));
|
||||
private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl();
|
||||
private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl(
|
||||
fakeClock.getScheduledExecutorService());
|
||||
private final Deque<FakeSubchannel> subchannels = new LinkedList<>();
|
||||
private final FakeThrottler fakeThrottler = new FakeThrottler();
|
||||
private final String channelTarget = "channelTarget";
|
||||
|
@ -296,6 +297,38 @@ public class RlsLoadBalancerTest {
|
|||
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fallbackWithDelay_succeeds() throws Exception {
|
||||
fakeRlsServerImpl.setResponseDelay(100, TimeUnit.MILLISECONDS);
|
||||
grpcCleanupRule.register(
|
||||
InProcessServerBuilder.forName("fake-bigtable.googleapis.com")
|
||||
.addService(ServerServiceDefinition.builder("com.google")
|
||||
.addMethod(fakeSearchMethod, (call, headers) -> {
|
||||
call.sendHeaders(new Metadata());
|
||||
call.sendMessage(null);
|
||||
call.close(Status.OK, new Metadata());
|
||||
return new ServerCall.Listener<Void>() {};
|
||||
})
|
||||
.build())
|
||||
.addService(fakeRlsServerImpl)
|
||||
.directExecutor()
|
||||
.build()
|
||||
.start());
|
||||
ManagedChannel channel = grpcCleanupRule.register(
|
||||
InProcessChannelBuilder.forName("fake-bigtable.googleapis.com")
|
||||
.defaultServiceConfig(parseJson(getServiceConfigJsonStr()))
|
||||
.directExecutor()
|
||||
.build());
|
||||
|
||||
StreamRecorder<Void> recorder = StreamRecorder.create();
|
||||
StreamObserver<Void> requestObserver = ClientCalls.asyncClientStreamingCall(
|
||||
channel.newCall(fakeSearchMethod, CallOptions.DEFAULT), recorder);
|
||||
requestObserver.onCompleted();
|
||||
fakeClock.forwardTime(100, TimeUnit.MILLISECONDS);
|
||||
assertThat(recorder.awaitCompletion(10, TimeUnit.SECONDS)).isTrue();
|
||||
assertThat(recorder.getError()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void metricsWithRealChannel() throws Exception {
|
||||
grpcCleanupRule.register(
|
||||
|
@ -308,6 +341,7 @@ public class RlsLoadBalancerTest {
|
|||
return new ServerCall.Listener<Void>() {};
|
||||
})
|
||||
.build())
|
||||
.addService(fakeRlsServerImpl)
|
||||
.directExecutor()
|
||||
.build()
|
||||
.start());
|
||||
|
@ -761,17 +795,41 @@ public class RlsLoadBalancerTest {
|
|||
private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
|
||||
RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
|
||||
|
||||
private final ScheduledExecutorService scheduler;
|
||||
private long delay;
|
||||
private TimeUnit delayUnit;
|
||||
|
||||
public FakeRlsServerImpl(ScheduledExecutorService scheduler) {
|
||||
this.scheduler = scheduler;
|
||||
}
|
||||
|
||||
private Map<RouteLookupRequest, RouteLookupResponse> lookupTable = ImmutableMap.of();
|
||||
|
||||
private void setLookupTable(Map<RouteLookupRequest, RouteLookupResponse> lookupTable) {
|
||||
this.lookupTable = checkNotNull(lookupTable, "lookupTable");
|
||||
}
|
||||
|
||||
void setResponseDelay(long delay, TimeUnit unit) {
|
||||
this.delay = delay;
|
||||
this.delayUnit = unit;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("FutureReturnValueIgnored")
|
||||
public void routeLookup(io.grpc.lookup.v1.RouteLookupRequest request,
|
||||
StreamObserver<io.grpc.lookup.v1.RouteLookupResponse> responseObserver) {
|
||||
RouteLookupResponse response =
|
||||
lookupTable.get(REQUEST_CONVERTER.convert(request));
|
||||
Runnable sendResponse = () -> sendResponse(response, responseObserver);
|
||||
if (delay != 0) {
|
||||
scheduler.schedule(sendResponse, delay, delayUnit);
|
||||
} else {
|
||||
sendResponse.run();
|
||||
}
|
||||
}
|
||||
|
||||
private void sendResponse(RouteLookupResponse response,
|
||||
StreamObserver<io.grpc.lookup.v1.RouteLookupResponse> responseObserver) {
|
||||
if (response == null) {
|
||||
responseObserver.onError(new RuntimeException("not found"));
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue