xds:Update logic to match A57 (#9745)

* xds:Change timer creation logic to wait until the adsStream is ready before creating the timer to mark resources absent.

* xds:When the ads stream is closed only send errors to subscribers that haven't yet gotten results to match spec.

* Use a blocking queue to avoid the 2-second sleep.
For some inexplicable reason the following call.verifyRequest fails only for the V2 test and only from command line not IDE unless there is some Thread.sleep, even if it is only 1-millis.
This commit is contained in:
Larry Safran 2022-12-15 22:54:35 +00:00 committed by GitHub
parent ccb5d945c0
commit 46ed02ed72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 169 additions and 31 deletions

View File

@ -36,6 +36,8 @@ import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.Node;
@ -71,6 +73,7 @@ final class AbstractXdsClient {
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Stopwatch stopwatch;
private final Node bootstrapNode;
private final XdsClient.TimerLaunch timerLaunch;
// Last successfully applied version_info for each resource type. Starts with empty string.
// A version_info is used to update management server with client's most recent knowledge of
@ -98,7 +101,8 @@ final class AbstractXdsClient {
timeService,
SynchronizationContext syncContext,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
Supplier<Stopwatch> stopwatchSupplier,
XdsClient.TimerLaunch timerLaunch) {
this.serverInfo = checkNotNull(serverInfo, "serverInfo");
this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo);
this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
@ -108,6 +112,7 @@ final class AbstractXdsClient {
this.timeService = checkNotNull(timeService, "timeService");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.timerLaunch = checkNotNull(timerLaunch, "timerLaunch");
stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
logId = InternalLogId.allocate("xds-client", serverInfo.target());
logger = XdsLogger.withLogId(logId);
@ -199,6 +204,22 @@ final class AbstractXdsClient {
return rpcRetryTimer != null && rpcRetryTimer.isPending();
}
boolean isReady() {
return adsStream != null && adsStream.isReady();
}
/**
* Starts a timer for each requested resource that hasn't been responded to and
* has been waiting for the channel to get ready.
*/
void readyHandler() {
if (!isReady()) {
return;
}
timerLaunch.startSubscriberTimersIfNeeded(serverInfo);
}
/**
* Establishes the RPC connection by creating a new RPC stream on the given channel for
* xDS protocol communication.
@ -262,6 +283,8 @@ final class AbstractXdsClient {
abstract void sendError(Exception error);
abstract boolean isReady();
/**
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
* {@code errorDetail}. Used for reacting to a specific discovery response. For
@ -344,13 +367,26 @@ final class AbstractXdsClient {
private final class AdsStreamV2 extends AbstractAdsStream {
private StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryRequest> requestWriter;
@Override
public boolean isReady() {
return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
}
@Override
void start() {
io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc
.AggregatedDiscoveryServiceStub stub =
io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.newStub(channel);
StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryResponse> responseReaderV2 =
new StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryResponse>() {
new ClientResponseObserver<io.envoyproxy.envoy.api.v2.DiscoveryRequest,
io.envoyproxy.envoy.api.v2.DiscoveryResponse>() {
@Override
public void beforeStart(
ClientCallStreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryRequest> reqStream) {
reqStream.setOnReadyHandler(AbstractXdsClient.this::readyHandler);
}
@Override
public void onNext(final io.envoyproxy.envoy.api.v2.DiscoveryResponse response) {
syncContext.execute(new Runnable() {
@ -427,11 +463,23 @@ final class AbstractXdsClient {
private final class AdsStreamV3 extends AbstractAdsStream {
private StreamObserver<DiscoveryRequest> requestWriter;
@Override
public boolean isReady() {
return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
}
@Override
void start() {
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
AggregatedDiscoveryServiceGrpc.newStub(channel);
StreamObserver<DiscoveryResponse> responseReader = new StreamObserver<DiscoveryResponse>() {
StreamObserver<DiscoveryResponse> responseReader =
new ClientResponseObserver<DiscoveryRequest,DiscoveryResponse>() {
@Override
public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) {
requestStream.setOnReadyHandler(AbstractXdsClient.this::readyHandler);
}
@Override
public void onNext(final DiscoveryResponse response) {
syncContext.execute(new Runnable() {

View File

@ -373,4 +373,12 @@ abstract class XdsClient {
Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl();
}
interface TimerLaunch {
/**
* For all subscriber's for the specified server, if the resource hasn't yet been
* resolved then start a timer for it.
*/
void startSubscriberTimersIfNeeded(ServerInfo serverInfo);
}
}

View File

@ -47,6 +47,7 @@ import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
import io.grpc.xds.XdsClient.ResourceStore;
import io.grpc.xds.XdsClient.TimerLaunch;
import io.grpc.xds.XdsClient.XdsResponseHandler;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.net.URI;
@ -65,9 +66,10 @@ import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* XdsClient implementation for client side usages.
* XdsClient implementation.
*/
final class XdsClientImpl extends XdsClient implements XdsResponseHandler, ResourceStore {
final class XdsClientImpl extends XdsClient
implements XdsResponseHandler, ResourceStore, TimerLaunch {
private static boolean LOG_XDS_NODE_ID = Boolean.parseBoolean(
System.getenv("GRPC_LOG_XDS_NODE_ID"));
@ -152,7 +154,8 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
timeService,
syncContext,
backoffPolicyProvider,
stopwatchSupplier);
stopwatchSupplier,
this);
LoadReportClient lrsClient = new LoadReportClient(
loadStatsManager, xdsChannel.channel(), context, serverInfo.useProtocolV3(),
bootstrapInfo.node(), syncContext, timeService, backoffPolicyProvider, stopwatchSupplier);
@ -188,7 +191,9 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
resourceSubscribers.values()) {
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
subscriber.onError(error);
if (!subscriber.hasResult()) {
subscriber.onError(error);
}
}
}
}
@ -386,6 +391,30 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
return logId.toString();
}
@Override
public void startSubscriberTimersIfNeeded(ServerInfo serverInfo) {
if (isShutDown()) {
return;
}
syncContext.execute(new Runnable() {
@Override
public void run() {
if (isShutDown()) {
return;
}
for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
if (subscriber.serverInfo.equals(serverInfo) && subscriber.respTimer == null) {
subscriber.restartTimer();
}
}
}
}
});
}
private void cleanUpResourceTimers() {
for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
@ -537,6 +566,10 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
if (data != null || absent) { // resource already resolved
return;
}
if (!xdsChannel.isReady()) { // When channel becomes ready, it will trigger a restartTimer
return;
}
class ResourceNotFound implements Runnable {
@Override
public void run() {
@ -554,6 +587,7 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
// Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED.
metadata = ResourceMetadata.newResourceMetadataRequested();
respTimer = syncContext.schedule(
new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS,
timeService);
@ -585,6 +619,10 @@ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, Resou
return !watchers.isEmpty();
}
boolean hasResult() {
return data != null || absent;
}
void onData(ParsedResource<T> parsedResource, String version, long updateTime) {
if (respTimer != null && respTimer.isPending()) {
respTimer.cancel();

View File

@ -50,6 +50,7 @@ import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.inprocess.InProcessChannelBuilder;
@ -95,6 +96,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
@ -185,7 +188,8 @@ public abstract class XdsClientImplTestBase {
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
private final FakeClock fakeClock = new FakeClock();
protected final Queue<DiscoveryRpcCall> resourceDiscoveryCalls = new ArrayDeque<>();
protected final BlockingDeque<DiscoveryRpcCall> resourceDiscoveryCalls =
new LinkedBlockingDeque<>(1);
protected final Queue<LrsRpcCall> loadReportCalls = new ArrayDeque<>();
protected final AtomicBoolean adsEnded = new AtomicBoolean(true);
protected final AtomicBoolean lrsEnded = new AtomicBoolean(true);
@ -244,6 +248,7 @@ public abstract class XdsClientImplTestBase {
private ArgumentCaptor<EdsUpdate> edsUpdateCaptor;
@Captor
private ArgumentCaptor<Status> errorCaptor;
@Mock
private BackoffPolicy.Provider backoffPolicyProvider;
@Mock
@ -269,6 +274,10 @@ public abstract class XdsClientImplTestBase {
private boolean originalEnableRbac;
private boolean originalEnableLeastRequest;
private boolean originalEnableFederation;
private Server xdsServer;
private final String serverName = InProcessServerBuilder.generateName();
private BindableService adsService = createAdsService();
private BindableService lrsService = createLrsService();
@Before
public void setUp() throws IOException {
@ -286,15 +295,13 @@ public abstract class XdsClientImplTestBase {
originalEnableLeastRequest = XdsResourceType.enableLeastRequest;
XdsResourceType.enableLeastRequest = true;
originalEnableFederation = BootstrapperImpl.enableFederation;
final String serverName = InProcessServerBuilder.generateName();
cleanupRule.register(
InProcessServerBuilder
.forName(serverName)
.addService(createAdsService())
.addService(createLrsService())
.directExecutor()
.build()
.start());
xdsServer = cleanupRule.register(InProcessServerBuilder
.forName(serverName)
.addService(adsService)
.addService(lrsService)
.directExecutor()
.build()
.start());
channel =
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
XdsChannelFactory xdsChannelFactory = new XdsChannelFactory() {
@ -1040,10 +1047,10 @@ public abstract class XdsClientImplTestBase {
assertThat(error.getCode()).isEqualTo(Code.INVALID_ARGUMENT);
assertThat(error.getDescription()).isEqualTo(
"Wrong configuration: xds server does not exist for resource " + rdsResourceName);
assertThat(resourceDiscoveryCalls.poll()).isNull();
assertThat(resourceDiscoveryCalls.size()).isEqualTo(0);
xdsClient.cancelXdsResourceWatch(
XdsRouteConfigureResource.getInstance(),rdsResourceName, rdsResourceWatcher);
assertThat(resourceDiscoveryCalls.poll()).isNull();
assertThat(resourceDiscoveryCalls.size()).isEqualTo(0);
}
@Test
@ -3238,10 +3245,8 @@ public abstract class XdsClientImplTestBase {
call.verifyRequest(RDS, RDS_RESOURCE, "5", "6764", NODE);
call.sendError(Status.DEADLINE_EXCEEDED.asException());
verify(ldsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
verify(rdsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture());
@ -3262,10 +3267,8 @@ public abstract class XdsClientImplTestBase {
// Management server becomes unreachable again.
call.sendError(Status.UNAVAILABLE.asException());
verify(ldsResourceWatcher, times(4)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(rdsResourceWatcher, times(4)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(4)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(edsResourceWatcher, times(4)).onError(errorCaptor.capture());
@ -3349,10 +3352,8 @@ public abstract class XdsClientImplTestBase {
call.sendError(Status.UNAVAILABLE.asException());
assertThat(cdsResourceTimeout.isCancelled()).isTrue();
assertThat(edsResourceTimeout.isCancelled()).isTrue();
verify(ldsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(rdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(ldsResourceWatcher, never()).onError(errorCaptor.capture());
verify(rdsResourceWatcher, never()).onError(errorCaptor.capture());
verify(cdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(edsResourceWatcher).onError(errorCaptor.capture());
@ -3509,6 +3510,48 @@ public abstract class XdsClientImplTestBase {
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
}
@Test
public void sendingToStoppedServer() throws Exception {
try {
// Establish the adsStream object
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CDS_RESOURCE,
cdsResourceWatcher);
resourceDiscoveryCalls.take(); // clear this entry
// Shutdown server and initiate a request
xdsServer.shutdownNow();
xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE,
ldsResourceWatcher);
fakeClock.forwardTime(14, TimeUnit.SECONDS);
// Restart the server
xdsServer = cleanupRule.register(
InProcessServerBuilder
.forName(serverName)
.addService(adsService)
.addService(lrsService)
.directExecutor()
.build()
.start());
fakeClock.forwardTime(5, TimeUnit.SECONDS);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS);
Thread.sleep(1); // For some reason the V2 test fails the verifyRequest without this
// Send a response and do verifications
verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE);
call.sendResponse(LDS, mf.buildWrappedResource(testListenerVhosts), VERSION_1, "0001");
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0001", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(1, 1, 0, 0);
} catch (Throwable t) {
throw t; // This allows putting a breakpoint here for debugging
}
}
private <T extends ResourceUpdate> DiscoveryRpcCall startResourceWatcher(
XdsResourceType<T> type, String name, ResourceWatcher<T> watcher) {
FakeClock.TaskFilter timeoutTaskFilter;
@ -3532,6 +3575,7 @@ public abstract class XdsClientImplTestBase {
default:
throw new AssertionError("should never be here");
}
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
call.verifyRequest(type, Collections.singletonList(name), "", "", NODE);
ScheduledTask timeoutTask =