diff --git a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java index d7eb9383f0..76d60a40c4 100644 --- a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java @@ -36,6 +36,8 @@ import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.EnvoyProtoData.Node; @@ -71,6 +73,7 @@ final class AbstractXdsClient { private final BackoffPolicy.Provider backoffPolicyProvider; private final Stopwatch stopwatch; private final Node bootstrapNode; + private final XdsClient.TimerLaunch timerLaunch; // Last successfully applied version_info for each resource type. Starts with empty string. // A version_info is used to update management server with client's most recent knowledge of @@ -98,7 +101,8 @@ final class AbstractXdsClient { timeService, SynchronizationContext syncContext, BackoffPolicy.Provider backoffPolicyProvider, - Supplier stopwatchSupplier) { + Supplier stopwatchSupplier, + XdsClient.TimerLaunch timerLaunch) { this.serverInfo = checkNotNull(serverInfo, "serverInfo"); this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo); this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler"); @@ -108,6 +112,7 @@ final class AbstractXdsClient { this.timeService = checkNotNull(timeService, "timeService"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); + this.timerLaunch = checkNotNull(timerLaunch, "timerLaunch"); stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get(); logId = InternalLogId.allocate("xds-client", serverInfo.target()); logger = XdsLogger.withLogId(logId); @@ -199,6 +204,22 @@ final class AbstractXdsClient { return rpcRetryTimer != null && rpcRetryTimer.isPending(); } + boolean isReady() { + return adsStream != null && adsStream.isReady(); + } + + /** + * Starts a timer for each requested resource that hasn't been responded to and + * has been waiting for the channel to get ready. + */ + void readyHandler() { + if (!isReady()) { + return; + } + + timerLaunch.startSubscriberTimersIfNeeded(serverInfo); + } + /** * Establishes the RPC connection by creating a new RPC stream on the given channel for * xDS protocol communication. @@ -262,6 +283,8 @@ final class AbstractXdsClient { abstract void sendError(Exception error); + abstract boolean isReady(); + /** * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and * {@code errorDetail}. Used for reacting to a specific discovery response. For @@ -344,13 +367,26 @@ final class AbstractXdsClient { private final class AdsStreamV2 extends AbstractAdsStream { private StreamObserver requestWriter; + @Override + public boolean isReady() { + return requestWriter != null && ((ClientCallStreamObserver) requestWriter).isReady(); + } + @Override void start() { io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc .AggregatedDiscoveryServiceStub stub = io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.newStub(channel); StreamObserver responseReaderV2 = - new StreamObserver() { + new ClientResponseObserver() { + + @Override + public void beforeStart( + ClientCallStreamObserver reqStream) { + reqStream.setOnReadyHandler(AbstractXdsClient.this::readyHandler); + } + @Override public void onNext(final io.envoyproxy.envoy.api.v2.DiscoveryResponse response) { syncContext.execute(new Runnable() { @@ -427,11 +463,23 @@ final class AbstractXdsClient { private final class AdsStreamV3 extends AbstractAdsStream { private StreamObserver requestWriter; + @Override + public boolean isReady() { + return requestWriter != null && ((ClientCallStreamObserver) requestWriter).isReady(); + } + @Override void start() { AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub = AggregatedDiscoveryServiceGrpc.newStub(channel); - StreamObserver responseReader = new StreamObserver() { + StreamObserver responseReader = + new ClientResponseObserver() { + + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + requestStream.setOnReadyHandler(AbstractXdsClient.this::readyHandler); + } + @Override public void onNext(final DiscoveryResponse response) { syncContext.execute(new Runnable() { diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 82af865115..591c4d7f33 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -373,4 +373,12 @@ abstract class XdsClient { Map> getSubscribedResourceTypesWithTypeUrl(); } + + interface TimerLaunch { + /** + * For all subscriber's for the specified server, if the resource hasn't yet been + * resolved then start a timer for it. + */ + void startSubscriberTimersIfNeeded(ServerInfo serverInfo); + } } diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index 26392acc73..b72364530d 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -47,6 +47,7 @@ import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats; import io.grpc.xds.XdsClient.ResourceStore; +import io.grpc.xds.XdsClient.TimerLaunch; import io.grpc.xds.XdsClient.XdsResponseHandler; import io.grpc.xds.XdsLogger.XdsLogLevel; import java.net.URI; @@ -65,9 +66,10 @@ import java.util.logging.Logger; import javax.annotation.Nullable; /** - * XdsClient implementation for client side usages. + * XdsClient implementation. */ -final class XdsClientImpl extends XdsClient implements XdsResponseHandler, ResourceStore { +final class XdsClientImpl extends XdsClient + implements XdsResponseHandler, ResourceStore, TimerLaunch { private static boolean LOG_XDS_NODE_ID = Boolean.parseBoolean( System.getenv("GRPC_LOG_XDS_NODE_ID")); @@ -152,7 +154,8 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou timeService, syncContext, backoffPolicyProvider, - stopwatchSupplier); + stopwatchSupplier, + this); LoadReportClient lrsClient = new LoadReportClient( loadStatsManager, xdsChannel.channel(), context, serverInfo.useProtocolV3(), bootstrapInfo.node(), syncContext, timeService, backoffPolicyProvider, stopwatchSupplier); @@ -188,7 +191,9 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou for (Map> subscriberMap : resourceSubscribers.values()) { for (ResourceSubscriber subscriber : subscriberMap.values()) { - subscriber.onError(error); + if (!subscriber.hasResult()) { + subscriber.onError(error); + } } } } @@ -386,6 +391,30 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou return logId.toString(); } + @Override + public void startSubscriberTimersIfNeeded(ServerInfo serverInfo) { + if (isShutDown()) { + return; + } + + syncContext.execute(new Runnable() { + @Override + public void run() { + if (isShutDown()) { + return; + } + + for (Map> subscriberMap : resourceSubscribers.values()) { + for (ResourceSubscriber subscriber : subscriberMap.values()) { + if (subscriber.serverInfo.equals(serverInfo) && subscriber.respTimer == null) { + subscriber.restartTimer(); + } + } + } + } + }); + } + private void cleanUpResourceTimers() { for (Map> subscriberMap : resourceSubscribers.values()) { for (ResourceSubscriber subscriber : subscriberMap.values()) { @@ -537,6 +566,10 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou if (data != null || absent) { // resource already resolved return; } + if (!xdsChannel.isReady()) { // When channel becomes ready, it will trigger a restartTimer + return; + } + class ResourceNotFound implements Runnable { @Override public void run() { @@ -554,6 +587,7 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou // Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED. metadata = ResourceMetadata.newResourceMetadataRequested(); + respTimer = syncContext.schedule( new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService); @@ -585,6 +619,10 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou return !watchers.isEmpty(); } + boolean hasResult() { + return data != null || absent; + } + void onData(ParsedResource parsedResource, String version, long updateTime) { if (respTimer != null && respTimer.isPending()) { respTimer.cancel(); diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index bc91f39cb2..f3ce4d7182 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -50,6 +50,7 @@ import io.grpc.Context; import io.grpc.Context.CancellableContext; import io.grpc.InsecureChannelCredentials; import io.grpc.ManagedChannel; +import io.grpc.Server; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.inprocess.InProcessChannelBuilder; @@ -95,6 +96,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; @@ -185,7 +188,8 @@ public abstract class XdsClientImplTestBase { public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); private final FakeClock fakeClock = new FakeClock(); - protected final Queue resourceDiscoveryCalls = new ArrayDeque<>(); + protected final BlockingDeque resourceDiscoveryCalls = + new LinkedBlockingDeque<>(1); protected final Queue loadReportCalls = new ArrayDeque<>(); protected final AtomicBoolean adsEnded = new AtomicBoolean(true); protected final AtomicBoolean lrsEnded = new AtomicBoolean(true); @@ -244,6 +248,7 @@ public abstract class XdsClientImplTestBase { private ArgumentCaptor edsUpdateCaptor; @Captor private ArgumentCaptor errorCaptor; + @Mock private BackoffPolicy.Provider backoffPolicyProvider; @Mock @@ -269,6 +274,10 @@ public abstract class XdsClientImplTestBase { private boolean originalEnableRbac; private boolean originalEnableLeastRequest; private boolean originalEnableFederation; + private Server xdsServer; + private final String serverName = InProcessServerBuilder.generateName(); + private BindableService adsService = createAdsService(); + private BindableService lrsService = createLrsService(); @Before public void setUp() throws IOException { @@ -286,15 +295,13 @@ public abstract class XdsClientImplTestBase { originalEnableLeastRequest = XdsResourceType.enableLeastRequest; XdsResourceType.enableLeastRequest = true; originalEnableFederation = BootstrapperImpl.enableFederation; - final String serverName = InProcessServerBuilder.generateName(); - cleanupRule.register( - InProcessServerBuilder - .forName(serverName) - .addService(createAdsService()) - .addService(createLrsService()) - .directExecutor() - .build() - .start()); + xdsServer = cleanupRule.register(InProcessServerBuilder + .forName(serverName) + .addService(adsService) + .addService(lrsService) + .directExecutor() + .build() + .start()); channel = cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); XdsChannelFactory xdsChannelFactory = new XdsChannelFactory() { @@ -1040,10 +1047,10 @@ public abstract class XdsClientImplTestBase { assertThat(error.getCode()).isEqualTo(Code.INVALID_ARGUMENT); assertThat(error.getDescription()).isEqualTo( "Wrong configuration: xds server does not exist for resource " + rdsResourceName); - assertThat(resourceDiscoveryCalls.poll()).isNull(); + assertThat(resourceDiscoveryCalls.size()).isEqualTo(0); xdsClient.cancelXdsResourceWatch( XdsRouteConfigureResource.getInstance(),rdsResourceName, rdsResourceWatcher); - assertThat(resourceDiscoveryCalls.poll()).isNull(); + assertThat(resourceDiscoveryCalls.size()).isEqualTo(0); } @Test @@ -3238,10 +3245,8 @@ public abstract class XdsClientImplTestBase { call.verifyRequest(RDS, RDS_RESOURCE, "5", "6764", NODE); call.sendError(Status.DEADLINE_EXCEEDED.asException()); - verify(ldsResourceWatcher, times(3)).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, ""); - verify(rdsResourceWatcher, times(3)).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, ""); + verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture()); + verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture()); verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, ""); verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture()); @@ -3262,10 +3267,8 @@ public abstract class XdsClientImplTestBase { // Management server becomes unreachable again. call.sendError(Status.UNAVAILABLE.asException()); - verify(ldsResourceWatcher, times(4)).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); - verify(rdsResourceWatcher, times(4)).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); + verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture()); + verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture()); verify(cdsResourceWatcher, times(4)).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); verify(edsResourceWatcher, times(4)).onError(errorCaptor.capture()); @@ -3349,10 +3352,8 @@ public abstract class XdsClientImplTestBase { call.sendError(Status.UNAVAILABLE.asException()); assertThat(cdsResourceTimeout.isCancelled()).isTrue(); assertThat(edsResourceTimeout.isCancelled()).isTrue(); - verify(ldsResourceWatcher).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); - verify(rdsResourceWatcher).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); + verify(ldsResourceWatcher, never()).onError(errorCaptor.capture()); + verify(rdsResourceWatcher, never()).onError(errorCaptor.capture()); verify(cdsResourceWatcher).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); verify(edsResourceWatcher).onError(errorCaptor.capture()); @@ -3509,6 +3510,48 @@ public abstract class XdsClientImplTestBase { verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); } + @Test + public void sendingToStoppedServer() throws Exception { + try { + // Establish the adsStream object + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CDS_RESOURCE, + cdsResourceWatcher); + resourceDiscoveryCalls.take(); // clear this entry + + // Shutdown server and initiate a request + xdsServer.shutdownNow(); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, + ldsResourceWatcher); + fakeClock.forwardTime(14, TimeUnit.SECONDS); + + // Restart the server + xdsServer = cleanupRule.register( + InProcessServerBuilder + .forName(serverName) + .addService(adsService) + .addService(lrsService) + .directExecutor() + .build() + .start()); + fakeClock.forwardTime(5, TimeUnit.SECONDS); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS); + Thread.sleep(1); // For some reason the V2 test fails the verifyRequest without this + + // Send a response and do verifications + verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE); + call.sendResponse(LDS, mf.buildWrappedResource(testListenerVhosts), VERSION_1, "0001"); + call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0001", NODE); + verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); + verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue()); + assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT); + verifySubscribedResourcesMetadataSizes(1, 1, 0, 0); + } catch (Throwable t) { + throw t; // This allows putting a breakpoint here for debugging + } + } + + private DiscoveryRpcCall startResourceWatcher( XdsResourceType type, String name, ResourceWatcher watcher) { FakeClock.TaskFilter timeoutTaskFilter; @@ -3532,6 +3575,7 @@ public abstract class XdsClientImplTestBase { default: throw new AssertionError("should never be here"); } + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); call.verifyRequest(type, Collections.singletonList(name), "", "", NODE); ScheduledTask timeoutTask =