mirror of https://github.com/grpc/grpc-java.git
parent
e36ad47695
commit
84f8bac8d4
|
@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkState;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import com.google.protobuf.Struct;
|
||||
|
@ -41,6 +43,8 @@ import io.grpc.EquivalentAddressGroup;
|
|||
import io.grpc.LoadBalancer.Helper;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.SynchronizationContext.ScheduledHandle;
|
||||
import io.grpc.internal.BackoffPolicy;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
|
@ -49,6 +53,8 @@ import java.util.Collections;
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.annotation.CheckForNull;
|
||||
|
||||
/**
|
||||
* ADS client implementation.
|
||||
|
@ -56,7 +62,14 @@ import java.util.Map;
|
|||
final class XdsComms {
|
||||
private final ManagedChannel channel;
|
||||
private final Helper helper;
|
||||
private final BackoffPolicy.Provider backoffPolicyProvider;
|
||||
private final Supplier<Stopwatch> stopwatchSupplier;
|
||||
|
||||
@CheckForNull
|
||||
private ScheduledHandle adsRpcRetryTimer;
|
||||
|
||||
// never null
|
||||
private BackoffPolicy adsRpcRetryPolicy;
|
||||
// never null
|
||||
private AdsStream adsStream;
|
||||
|
||||
|
@ -169,9 +182,12 @@ final class XdsComms {
|
|||
|
||||
final StreamObserver<DiscoveryRequest> xdsRequestWriter;
|
||||
|
||||
final Stopwatch retryStopwatch = stopwatchSupplier.get().start();
|
||||
|
||||
final StreamObserver<DiscoveryResponse> xdsResponseReader =
|
||||
new StreamObserver<DiscoveryResponse>() {
|
||||
|
||||
// Must be accessed in SynchronizationContext
|
||||
boolean firstEdsResponseReceived;
|
||||
|
||||
@Override
|
||||
|
@ -193,6 +209,7 @@ final class XdsComms {
|
|||
} catch (InvalidProtocolBufferException | RuntimeException e) {
|
||||
cancelRpc("Received invalid EDS response", e);
|
||||
adsStreamCallback.onError();
|
||||
scheduleRetry();
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -254,12 +271,12 @@ final class XdsComms {
|
|||
new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// TODO: schedule retry
|
||||
closed = true;
|
||||
if (cancelled) {
|
||||
return;
|
||||
}
|
||||
adsStreamCallback.onError();
|
||||
scheduleRetry();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -269,6 +286,42 @@ final class XdsComms {
|
|||
onError(Status.INTERNAL.withDescription("Server closed the ADS streaming RPC")
|
||||
.asException());
|
||||
}
|
||||
|
||||
// run in SynchronizationContext
|
||||
void scheduleRetry() {
|
||||
if (channel.isShutdown()) {
|
||||
return;
|
||||
}
|
||||
|
||||
checkState(
|
||||
cancelled || closed,
|
||||
"Scheduling retry while the stream is neither cancelled nor closed");
|
||||
|
||||
checkState(
|
||||
adsRpcRetryTimer == null, "Scheduling retry while a retry is already pending");
|
||||
|
||||
class AdsRpcRetryTask implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
adsRpcRetryTimer = null;
|
||||
refreshAdsStream();
|
||||
}
|
||||
}
|
||||
|
||||
if (firstEdsResponseReceived) {
|
||||
// Reset the backoff sequence if balancer has sent the initial response
|
||||
adsRpcRetryPolicy = backoffPolicyProvider.get();
|
||||
// Retry immediately
|
||||
helper.getSynchronizationContext().execute(new AdsRpcRetryTask());
|
||||
return;
|
||||
}
|
||||
|
||||
adsRpcRetryTimer = helper.getSynchronizationContext().schedule(
|
||||
new AdsRpcRetryTask(),
|
||||
adsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS),
|
||||
TimeUnit.NANOSECONDS,
|
||||
helper.getScheduledExecutorService());
|
||||
}
|
||||
};
|
||||
|
||||
boolean cancelled;
|
||||
|
@ -280,6 +333,7 @@ final class XdsComms {
|
|||
.streamAggregatedResources(xdsResponseReader);
|
||||
this.localityStore = localityStore;
|
||||
|
||||
checkState(adsRpcRetryTimer == null, "Creating AdsStream while retry is pending");
|
||||
// Assuming standard mode, and send EDS request only
|
||||
DiscoveryRequest edsRequest =
|
||||
DiscoveryRequest.newBuilder()
|
||||
|
@ -301,6 +355,7 @@ final class XdsComms {
|
|||
this(adsStream.adsStreamCallback, adsStream.localityStore);
|
||||
}
|
||||
|
||||
// run in SynchronizationContext
|
||||
void cancelRpc(String message, Throwable cause) {
|
||||
if (cancelled) {
|
||||
return;
|
||||
|
@ -341,26 +396,42 @@ final class XdsComms {
|
|||
*/
|
||||
XdsComms(
|
||||
ManagedChannel channel, Helper helper, AdsStreamCallback adsStreamCallback,
|
||||
LocalityStore localityStore) {
|
||||
LocalityStore localityStore, BackoffPolicy.Provider backoffPolicyProvider,
|
||||
Supplier<Stopwatch> stopwatchSupplier) {
|
||||
this.channel = checkNotNull(channel, "channel");
|
||||
this.helper = checkNotNull(helper, "helper");
|
||||
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
|
||||
this.adsStream = new AdsStream(
|
||||
checkNotNull(adsStreamCallback, "adsStreamCallback"),
|
||||
checkNotNull(localityStore, "localityStore"));
|
||||
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
|
||||
this.adsRpcRetryPolicy = backoffPolicyProvider.get();
|
||||
}
|
||||
|
||||
// run in SynchronizationContext
|
||||
void refreshAdsStream() {
|
||||
checkState(!channel.isShutdown(), "channel is alreday shutdown");
|
||||
|
||||
if (adsStream.closed || adsStream.cancelled) {
|
||||
cancelRetryTimer();
|
||||
adsStream = new AdsStream(adsStream);
|
||||
}
|
||||
}
|
||||
|
||||
// run in SynchronizationContext
|
||||
// TODO: Change method name to shutdown or shutdownXdsComms if that gives better semantics (
|
||||
// cancel LB RPC and clean up retry timer).
|
||||
void shutdownLbRpc(String message) {
|
||||
adsStream.cancelRpc(message, null);
|
||||
cancelRetryTimer();
|
||||
}
|
||||
|
||||
// run in SynchronizationContext
|
||||
private void cancelRetryTimer() {
|
||||
if (adsRpcRetryTimer != null) {
|
||||
adsRpcRetryTimer.cancel();
|
||||
adsRpcRetryTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -25,6 +25,8 @@ import io.grpc.LoadBalancer.Helper;
|
|||
import io.grpc.LoadBalancer.Subchannel;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.BackoffPolicy;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
||||
import io.grpc.xds.XdsComms.AdsStreamCallback;
|
||||
import java.util.List;
|
||||
|
@ -55,6 +57,7 @@ class XdsLbState {
|
|||
private final Helper helper;
|
||||
private final ManagedChannel channel;
|
||||
private final AdsStreamCallback adsStreamCallback;
|
||||
private final BackoffPolicy.Provider backoffPolicyProvider;
|
||||
|
||||
@Nullable
|
||||
private XdsComms xdsComms;
|
||||
|
@ -65,13 +68,15 @@ class XdsLbState {
|
|||
Helper helper,
|
||||
LocalityStore localityStore,
|
||||
ManagedChannel channel,
|
||||
AdsStreamCallback adsStreamCallback) {
|
||||
AdsStreamCallback adsStreamCallback,
|
||||
BackoffPolicy.Provider backoffPolicyProvider) {
|
||||
this.balancerName = checkNotNull(balancerName, "balancerName");
|
||||
this.childPolicy = childPolicy;
|
||||
this.helper = checkNotNull(helper, "helper");
|
||||
this.localityStore = checkNotNull(localityStore, "localityStore");
|
||||
this.channel = checkNotNull(channel, "channel");
|
||||
this.adsStreamCallback = checkNotNull(adsStreamCallback, "adsStreamCallback");
|
||||
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
|
||||
}
|
||||
|
||||
final void handleResolvedAddressGroups(
|
||||
|
@ -82,7 +87,9 @@ class XdsLbState {
|
|||
xdsComms.refreshAdsStream();
|
||||
} else {
|
||||
// TODO(zdapeng): pass a helper that has the right ChannelLogger for the oobChannel
|
||||
xdsComms = new XdsComms(channel, helper, adsStreamCallback, localityStore);
|
||||
xdsComms = new XdsComms(
|
||||
channel, helper, adsStreamCallback, localityStore, backoffPolicyProvider,
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
}
|
||||
|
||||
// TODO: maybe update picker
|
||||
|
|
|
@ -228,7 +228,7 @@ final class XdsLoadBalancer extends LoadBalancer {
|
|||
}
|
||||
xdsLbState =
|
||||
new XdsLbState(newBalancerName, childPolicy, helper, localityStore, lbChannel,
|
||||
adsStreamCallback);
|
||||
adsStreamCallback, backoffPolicyProvider);
|
||||
}
|
||||
|
||||
private static ManagedChannel initLbChannel(Helper helper, String balancerName) {
|
||||
|
|
|
@ -19,6 +19,8 @@ package io.grpc.xds;
|
|||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.inOrder;
|
||||
|
@ -54,6 +56,8 @@ import io.grpc.Status;
|
|||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.inprocess.InProcessChannelBuilder;
|
||||
import io.grpc.inprocess.InProcessServerBuilder;
|
||||
import io.grpc.internal.BackoffPolicy;
|
||||
import io.grpc.internal.FakeClock;
|
||||
import io.grpc.internal.testing.StreamRecorder;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import io.grpc.testing.GrpcCleanupRule;
|
||||
|
@ -78,6 +82,16 @@ import org.mockito.MockitoAnnotations;
|
|||
*/
|
||||
@RunWith(JUnit4.class)
|
||||
public class XdsCommsTest {
|
||||
private static final String EDS_TYPE_URL =
|
||||
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
|
||||
private static final FakeClock.TaskFilter LB_RPC_RETRY_TASK_FILTER =
|
||||
new FakeClock.TaskFilter() {
|
||||
@Override
|
||||
public boolean shouldAccept(Runnable command) {
|
||||
return command.toString().contains("AdsRpcRetryTask");
|
||||
}
|
||||
};
|
||||
|
||||
@Rule
|
||||
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
|
||||
@Mock
|
||||
|
@ -86,9 +100,16 @@ public class XdsCommsTest {
|
|||
private AdsStreamCallback adsStreamCallback;
|
||||
@Mock
|
||||
private LocalityStore localityStore;
|
||||
@Mock
|
||||
private BackoffPolicy.Provider backoffPolicyProvider;
|
||||
@Mock
|
||||
private BackoffPolicy backoffPolicy1;
|
||||
@Mock
|
||||
private BackoffPolicy backoffPolicy2;
|
||||
@Captor
|
||||
private ArgumentCaptor<Map<XdsLocality, LocalityInfo>> localityEndpointsMappingCaptor;
|
||||
|
||||
private final FakeClock fakeClock = new FakeClock();
|
||||
private final SynchronizationContext syncContext = new SynchronizationContext(
|
||||
new Thread.UncaughtExceptionHandler() {
|
||||
@Override
|
||||
|
@ -98,7 +119,7 @@ public class XdsCommsTest {
|
|||
});
|
||||
private final LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry();
|
||||
|
||||
private final StreamRecorder<DiscoveryRequest> streamRecorder = StreamRecorder.create();
|
||||
private StreamRecorder<DiscoveryRequest> streamRecorder;
|
||||
private StreamObserver<DiscoveryResponse> responseWriter;
|
||||
|
||||
private ManagedChannel channel;
|
||||
|
@ -115,6 +136,7 @@ public class XdsCommsTest {
|
|||
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
|
||||
final StreamObserver<DiscoveryResponse> responseObserver) {
|
||||
responseWriter = responseObserver;
|
||||
streamRecorder = StreamRecorder.create();
|
||||
|
||||
return new StreamObserver<DiscoveryRequest>() {
|
||||
|
||||
|
@ -131,7 +153,6 @@ public class XdsCommsTest {
|
|||
@Override
|
||||
public void onCompleted() {
|
||||
streamRecorder.onCompleted();
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -148,6 +169,7 @@ public class XdsCommsTest {
|
|||
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
|
||||
doReturn("fake_authority").when(helper).getAuthority();
|
||||
doReturn(syncContext).when(helper).getSynchronizationContext();
|
||||
doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService();
|
||||
doReturn(mock(ChannelLogger.class)).when(helper).getChannelLogger();
|
||||
lbRegistry.register(new LoadBalancerProvider() {
|
||||
@Override
|
||||
|
@ -170,7 +192,12 @@ public class XdsCommsTest {
|
|||
return null;
|
||||
}
|
||||
});
|
||||
xdsComms = new XdsComms(channel, helper, adsStreamCallback, localityStore);
|
||||
doReturn(backoffPolicy1, backoffPolicy2).when(backoffPolicyProvider).get();
|
||||
doReturn(10L, 100L, 1000L).when(backoffPolicy1).nextBackoffNanos();
|
||||
doReturn(20L, 200L).when(backoffPolicy2).nextBackoffNanos();
|
||||
xdsComms = new XdsComms(
|
||||
channel, helper, adsStreamCallback, localityStore, backoffPolicyProvider,
|
||||
fakeClock.getStopwatchSupplier());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -192,8 +219,7 @@ public class XdsCommsTest {
|
|||
public void standardMode_sendEdsRequest_getEdsResponse_withNoDrop() {
|
||||
assertThat(streamRecorder.getValues()).hasSize(1);
|
||||
DiscoveryRequest request = streamRecorder.getValues().get(0);
|
||||
assertThat(request.getTypeUrl())
|
||||
.isEqualTo("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment");
|
||||
assertThat(request.getTypeUrl()).isEqualTo(EDS_TYPE_URL);
|
||||
assertThat(
|
||||
request.getNode().getMetadata().getFieldsOrThrow("endpoints_required").getBoolValue())
|
||||
.isTrue();
|
||||
|
@ -243,7 +269,7 @@ public class XdsCommsTest {
|
|||
.addLbEndpoints(endpoint22)
|
||||
.setLoadBalancingWeight(UInt32Value.of(2)))
|
||||
.build()))
|
||||
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
|
||||
.setTypeUrl(EDS_TYPE_URL)
|
||||
.build();
|
||||
responseWriter.onNext(edsResponse);
|
||||
|
||||
|
@ -282,7 +308,7 @@ public class XdsCommsTest {
|
|||
.addLbEndpoints(endpoint12)
|
||||
.setLoadBalancingWeight(UInt32Value.of(1)))
|
||||
.build()))
|
||||
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
|
||||
.setTypeUrl(EDS_TYPE_URL)
|
||||
.build();
|
||||
responseWriter.onNext(edsResponse);
|
||||
|
||||
|
@ -351,7 +377,7 @@ public class XdsCommsTest {
|
|||
.build())
|
||||
.build())
|
||||
.build()))
|
||||
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
|
||||
.setTypeUrl(EDS_TYPE_URL)
|
||||
.build();
|
||||
responseWriter.onNext(edsResponseWithDrops);
|
||||
|
||||
|
@ -414,7 +440,7 @@ public class XdsCommsTest {
|
|||
.build())
|
||||
.build())
|
||||
.build()))
|
||||
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
|
||||
.setTypeUrl(EDS_TYPE_URL)
|
||||
.build();
|
||||
responseWriter.onNext(edsResponseWithAllDrops);
|
||||
|
||||
|
@ -439,4 +465,191 @@ public class XdsCommsTest {
|
|||
verify(adsStreamCallback).onError();
|
||||
verifyNoMoreInteractions(adsStreamCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* The 1st ADS RPC receives invalid response. Verify retry is scheduled.
|
||||
* Verify the 2nd RPC (retry) starts after backoff.
|
||||
*
|
||||
* <p>The 2nd RPC fails with response observer onError() without receiving initial response.
|
||||
* Verify retry is scheduled. Verify the 3rd PRC starts after backoff.
|
||||
*
|
||||
* <p>The 3rd PRC receives invalid initial response. Verify retry is scheduled.
|
||||
* Verify the 4th PRC starts after backoff.
|
||||
*
|
||||
* <p>The 4th RPC receives valid initial response and then fails with response observer
|
||||
* onError(). Verify retry is scheduled. Verify the backoff is reset. Verify the 5th PRC starts
|
||||
* immediately.
|
||||
*
|
||||
* <p>The 5th RPC fails with response observer onError() without receiving initial response.
|
||||
* Verify retry is scheduled. Verify the 6th PRC starts after backoff.
|
||||
*
|
||||
* <p>The 6th RPC fails with response observer onError() without receiving initial response.
|
||||
* Verify retry is scheduled. Call {@link XdsComms#shutdownLbRpc(String)}, verify retry timer is
|
||||
* cancelled.
|
||||
*/
|
||||
@Test
|
||||
public void adsRpcRetry() {
|
||||
StreamRecorder<DiscoveryRequest> currentStreamRecorder = streamRecorder;
|
||||
assertThat(currentStreamRecorder.getValues()).hasSize(1);
|
||||
InOrder inOrder =
|
||||
inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2, adsStreamCallback);
|
||||
inOrder.verify(backoffPolicyProvider).get();
|
||||
assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
|
||||
DiscoveryResponse invalidResponse =
|
||||
DiscoveryResponse.newBuilder().setTypeUrl(EDS_TYPE_URL).build();
|
||||
// The 1st ADS RPC receives invalid response
|
||||
responseWriter.onNext(invalidResponse);
|
||||
inOrder.verify(adsStreamCallback).onError();
|
||||
assertThat(currentStreamRecorder.getError()).isNotNull();
|
||||
|
||||
// Will start backoff sequence 1 (10ns)
|
||||
inOrder.verify(backoffPolicy1).nextBackoffNanos();
|
||||
assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
|
||||
// Fast-forward to a moment before the retry
|
||||
fakeClock.forwardNanos(9);
|
||||
assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
assertSame(streamRecorder, currentStreamRecorder);
|
||||
|
||||
// Then time for retry
|
||||
fakeClock.forwardNanos(1);
|
||||
assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
assertNotSame(currentStreamRecorder, streamRecorder);
|
||||
currentStreamRecorder = streamRecorder;
|
||||
assertThat(currentStreamRecorder.getValues()).hasSize(1);
|
||||
|
||||
// Fail the retry after spending 4ns
|
||||
fakeClock.forwardNanos(4);
|
||||
// The 2nd RPC fails with response observer onError() without receiving initial response
|
||||
responseWriter.onError(new Exception("fake error"));
|
||||
inOrder.verify(adsStreamCallback).onError();
|
||||
|
||||
// Will start backoff sequence 2 (100ns)
|
||||
inOrder.verify(backoffPolicy1).nextBackoffNanos();
|
||||
assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
// Fast-forward to a moment before the retry, the time spent in the last try is deducted.
|
||||
fakeClock.forwardNanos(100 - 4 - 1);
|
||||
assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
assertSame(streamRecorder, currentStreamRecorder);
|
||||
|
||||
// Then time for retry
|
||||
fakeClock.forwardNanos(1);
|
||||
assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
assertNotSame(currentStreamRecorder, streamRecorder);
|
||||
currentStreamRecorder = streamRecorder;
|
||||
assertThat(currentStreamRecorder.getValues()).hasSize(1);
|
||||
assertThat(currentStreamRecorder.getError()).isNull();
|
||||
|
||||
// Fail the retry after spending 5ns
|
||||
fakeClock.forwardNanos(5);
|
||||
// The 3rd PRC receives invalid initial response.
|
||||
responseWriter.onNext(invalidResponse);
|
||||
inOrder.verify(adsStreamCallback).onError();
|
||||
assertThat(currentStreamRecorder.getError()).isNotNull();
|
||||
|
||||
// Will start backoff sequence 3 (1000ns)
|
||||
inOrder.verify(backoffPolicy1).nextBackoffNanos();
|
||||
assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
|
||||
// Fast-forward to a moment before the retry, the time spent in the last try is deducted.
|
||||
fakeClock.forwardNanos(1000 - 5 - 1);
|
||||
assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
assertSame(streamRecorder, currentStreamRecorder);
|
||||
|
||||
// Then time for retry
|
||||
fakeClock.forwardNanos(1);
|
||||
assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
assertNotSame(currentStreamRecorder, streamRecorder);
|
||||
currentStreamRecorder = streamRecorder;
|
||||
assertThat(currentStreamRecorder.getValues()).hasSize(1);
|
||||
assertThat(currentStreamRecorder.getError()).isNull();
|
||||
|
||||
// The 4th RPC receives valid initial response
|
||||
fakeClock.forwardNanos(6);
|
||||
Locality localityProto1 = Locality.newBuilder()
|
||||
.setRegion("region1").setZone("zone1").setSubZone("subzone1").build();
|
||||
LbEndpoint endpoint11 = LbEndpoint.newBuilder()
|
||||
.setEndpoint(Endpoint.newBuilder()
|
||||
.setAddress(Address.newBuilder()
|
||||
.setSocketAddress(SocketAddress.newBuilder()
|
||||
.setAddress("addr11").setPortValue(11))))
|
||||
.setLoadBalancingWeight(UInt32Value.of(11))
|
||||
.build();
|
||||
DiscoveryResponse validEdsResponse = DiscoveryResponse.newBuilder()
|
||||
.addResources(Any.pack(ClusterLoadAssignment.newBuilder()
|
||||
.addEndpoints(LocalityLbEndpoints.newBuilder()
|
||||
.setLocality(localityProto1)
|
||||
.addLbEndpoints(endpoint11)
|
||||
.setLoadBalancingWeight(UInt32Value.of(1)))
|
||||
.build()))
|
||||
.setTypeUrl(EDS_TYPE_URL)
|
||||
.build();
|
||||
responseWriter.onNext(validEdsResponse);
|
||||
|
||||
inOrder.verify(backoffPolicyProvider, never()).get();
|
||||
inOrder.verify(backoffPolicy2, never()).nextBackoffNanos();
|
||||
assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
|
||||
// The 4th RPC then fails with response observer onError()
|
||||
fakeClock.forwardNanos(7);
|
||||
responseWriter.onError(new Exception("fake error"));
|
||||
|
||||
// Will reset the retry sequence and retry immediately, because balancer has responded.
|
||||
inOrder.verify(backoffPolicyProvider).get();
|
||||
assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
assertNotSame(currentStreamRecorder, streamRecorder);
|
||||
currentStreamRecorder = streamRecorder;
|
||||
assertThat(currentStreamRecorder.getValues()).hasSize(1);
|
||||
assertThat(currentStreamRecorder.getError()).isNull();
|
||||
|
||||
// The 5th RPC fails with response observer onError() without receiving initial response
|
||||
fakeClock.forwardNanos(8);
|
||||
responseWriter.onError(new Exception("fake error"));
|
||||
inOrder.verify(adsStreamCallback).onError();
|
||||
|
||||
// Will start backoff sequence 1 (20ns)
|
||||
inOrder.verify(backoffPolicy2).nextBackoffNanos();
|
||||
assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
// Fast-forward to a moment before the retry, the time spent in the last try is deducted.
|
||||
fakeClock.forwardNanos(20 - 8 - 1);
|
||||
assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
assertSame(streamRecorder, currentStreamRecorder);
|
||||
|
||||
// Then time for retry
|
||||
fakeClock.forwardNanos(1);
|
||||
assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
assertNotSame(currentStreamRecorder, streamRecorder);
|
||||
currentStreamRecorder = streamRecorder;
|
||||
assertThat(currentStreamRecorder.getValues()).hasSize(1);
|
||||
assertThat(currentStreamRecorder.getError()).isNull();
|
||||
|
||||
// Wrapping up
|
||||
verify(backoffPolicyProvider, times(2)).get();
|
||||
verify(backoffPolicy1, times(3)).nextBackoffNanos(); // for 2nd, 3rd, 4th RPC
|
||||
verify(backoffPolicy2, times(1)).nextBackoffNanos(); // for 6th RPC
|
||||
|
||||
// The 6th RPC fails with response observer onError() without receiving initial response
|
||||
responseWriter.onError(new Exception("fake error"));
|
||||
inOrder.verify(adsStreamCallback).onError();
|
||||
|
||||
// Retry is scheduled
|
||||
assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
|
||||
// Shutdown cancels retry
|
||||
xdsComms.shutdownLbRpc("shutdown");
|
||||
assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void refreshAdsStreamCancelsExistingRetry() {
|
||||
responseWriter.onError(new Exception("fake error"));
|
||||
verify(adsStreamCallback).onError();
|
||||
assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
|
||||
xdsComms.refreshAdsStream();
|
||||
assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
|
||||
|
||||
xdsComms.shutdownLbRpc("End test");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import io.grpc.ManagedChannel;
|
|||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.inprocess.InProcessChannelBuilder;
|
||||
import io.grpc.inprocess.InProcessServerBuilder;
|
||||
import io.grpc.internal.BackoffPolicy;
|
||||
import io.grpc.internal.FakeClock;
|
||||
import io.grpc.internal.testing.StreamRecorder;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
|
@ -60,6 +61,8 @@ public class XdsLbStateTest {
|
|||
private AdsStreamCallback adsStreamCallback;
|
||||
@Mock
|
||||
private LocalityStore localityStore;
|
||||
@Mock
|
||||
private BackoffPolicy.Provider backoffPolicyProvider;
|
||||
|
||||
private final FakeClock fakeClock = new FakeClock();
|
||||
|
||||
|
@ -122,8 +125,9 @@ public class XdsLbStateTest {
|
|||
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
|
||||
doReturn(channel).when(helper).createResolvingOobChannel(BALANCER_NAME);
|
||||
|
||||
xdsLbState =
|
||||
new XdsLbState(BALANCER_NAME, null, helper, localityStore, channel, adsStreamCallback);
|
||||
xdsLbState = new XdsLbState(
|
||||
BALANCER_NAME, null, helper, localityStore, channel, adsStreamCallback,
|
||||
backoffPolicyProvider);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue