xds: Update logic so that an error being reported when stream is closed gets propagated to subscribers (#9827)

* Stop setting waitForReady in XdsClient's AbstractXdsClient.
* Handle bad URL cleanly.  

Fix test cases to deal with asynchronous flow.
This commit is contained in:
Larry Safran 2023-01-25 02:31:50 +00:00 committed by GitHub
parent b0635fa1d4
commit 501ca8f7b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 104 additions and 12 deletions

View File

@ -61,6 +61,8 @@ import javax.annotation.Nullable;
* the xDS RPC stream.
*/
final class AbstractXdsClient {
public static final String CLOSED_BY_SERVER = "Closed by server";
private final SynchronizationContext syncContext;
private final InternalLogId logId;
private final XdsLogger logger;
@ -217,6 +219,11 @@ final class AbstractXdsClient {
return;
}
if (isInBackoff()) {
rpcRetryTimer.cancel();
rpcRetryTimer = null;
}
timerLaunch.startSubscriberTimersIfNeeded(serverInfo);
}
@ -315,21 +322,25 @@ final class AbstractXdsClient {
}
final void handleRpcCompleted() {
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
}
private void handleRpcStreamClosed(Status error) {
checkArgument(!error.isOk(), "unexpected OK status");
if (closed) {
return;
}
checkArgument(!error.isOk(), "unexpected OK status");
String errorMsg = error.getDescription() != null
&& error.getDescription().equals(CLOSED_BY_SERVER)
? "ADS stream closed with status {0}: {1}. Cause: {2}"
: "ADS stream failed with status {0}: {1}. Cause: {2}";
logger.log(
XdsLogLevel.ERROR,
"ADS stream closed with status {0}: {1}. Cause: {2}",
error.getCode(), error.getDescription(), error.getCause());
XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause());
closed = true;
xdsResponseHandler.handleStreamClosed(error);
cleanUp();
if (responseReceived || retryBackoffPolicy == null) {
// Reset the backoff sequence if had received a response, or backoff sequence
// has never been initialized.
@ -423,7 +434,7 @@ final class AbstractXdsClient {
});
}
};
requestWriter = stub.withWaitForReady().streamAggregatedResources(responseReader);
requestWriter = stub.streamAggregatedResources(responseReader);
}
@Override

View File

@ -514,11 +514,22 @@ final class XdsClientImpl extends XdsClient
// Initialize metadata in UNKNOWN state to cover the case when resource subscriber,
// is created but not yet requested because the client is in backoff.
this.metadata = ResourceMetadata.newResourceMetadataUnknown();
maybeCreateXdsChannelWithLrs(serverInfo);
this.xdsChannel = serverChannelMap.get(serverInfo);
if (xdsChannel.isInBackoff()) {
AbstractXdsClient xdsChannelTemp = null;
try {
maybeCreateXdsChannelWithLrs(serverInfo);
xdsChannelTemp = serverChannelMap.get(serverInfo);
if (xdsChannelTemp.isInBackoff()) {
return;
}
} catch (IllegalArgumentException e) {
xdsChannelTemp = null;
this.errorDescription = "Bad configuration: " + e.getMessage();
return;
} finally {
this.xdsChannel = xdsChannelTemp;
}
restartTimer();
}

View File

@ -18,6 +18,7 @@ package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static io.grpc.xds.XdsClientImpl.XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
@ -70,6 +71,7 @@ import io.grpc.internal.TimeProvider;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.Bootstrapper.AuthorityInfo;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.Bootstrapper.CertificateProviderInfo;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.Endpoints.DropOverload;
@ -114,6 +116,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
@ -3226,7 +3229,8 @@ public abstract class XdsClientImplTestBase {
// Management server closes the RPC stream with an error.
call.sendError(Status.UNKNOWN.asException());
verify(ldsResourceWatcher).onError(errorCaptor.capture());
verify(ldsResourceWatcher, Mockito.timeout(1000).times(1))
.onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, "");
verify(rdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, "");
@ -3336,7 +3340,8 @@ public abstract class XdsClientImplTestBase {
RDS_RESOURCE, rdsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
call.sendError(Status.UNAVAILABLE.asException());
verify(ldsResourceWatcher).onError(errorCaptor.capture());
verify(ldsResourceWatcher, Mockito.timeout(1000).times(1))
.onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(rdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
@ -3573,13 +3578,18 @@ public abstract class XdsClientImplTestBase {
.build()
.start());
fakeClock.forwardTime(5, TimeUnit.SECONDS);
verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE);
fakeClock.forwardTime(20, TimeUnit.SECONDS); // Trigger rpcRetryTimer
DiscoveryRpcCall call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS);
if (call == null) { // The first rpcRetry may have happened before the channel was ready
fakeClock.forwardTime(50, TimeUnit.SECONDS);
call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS);
}
// NOTE: There is a ScheduledExecutorService that may get involved due to the reconnect
// so you cannot rely on the logic being single threaded. The timeout() in verifyRequest
// is therefore necessary to avoid flakiness.
// Send a response and do verifications
verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE);
call.sendResponse(LDS, mf.buildWrappedResource(testListenerVhosts), VERSION_1, "0001");
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0001", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
@ -3592,6 +3602,66 @@ public abstract class XdsClientImplTestBase {
}
}
@Test
public void sendToBadUrl() throws Exception {
// Setup xdsClient to fail on stream creation
XdsClientImpl client = createXdsClient("some. garbage");
client.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher);
fakeClock.forwardTime(20, TimeUnit.SECONDS);
verify(ldsResourceWatcher, Mockito.timeout(5000).times(1)).onError(ArgumentMatchers.any());
client.shutdown();
}
@Test
public void sendToNonexistentHost() throws Exception {
// Setup xdsClient to fail on stream creation
XdsClientImpl client = createXdsClient("some.garbage");
client.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher);
fakeClock.forwardTime(20, TimeUnit.SECONDS);
verify(ldsResourceWatcher, Mockito.timeout(5000).times(1)).onError(ArgumentMatchers.any());
fakeClock.forwardTime(50, TimeUnit.SECONDS); // Trigger rpcRetry if appropriate
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
client.shutdown();
}
private XdsClientImpl createXdsClient(String serverUri) {
BootstrapInfo bootstrapInfo = buildBootStrap(serverUri);
return new XdsClientImpl(
DEFAULT_XDS_CHANNEL_FACTORY,
bootstrapInfo,
Context.ROOT,
fakeClock.getScheduledExecutorService(),
backoffPolicyProvider,
fakeClock.getStopwatchSupplier(),
timeProvider,
tlsContextManager);
}
private BootstrapInfo buildBootStrap(String serverUri) {
ServerInfo xdsServerInfo = ServerInfo.create(serverUri, CHANNEL_CREDENTIALS,
ignoreResourceDeletion());
return Bootstrapper.BootstrapInfo.builder()
.servers(Collections.singletonList(xdsServerInfo))
.node(NODE)
.authorities(ImmutableMap.of(
"authority.xds.com",
AuthorityInfo.create(
"xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s",
ImmutableList.of(Bootstrapper.ServerInfo.create(
SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS))),
"",
AuthorityInfo.create(
"xdstp:///envoy.config.listener.v3.Listener/%s",
ImmutableList.of(Bootstrapper.ServerInfo.create(
SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS)))))
.certProviders(ImmutableMap.of("cert-instance-name",
CertificateProviderInfo.create("file-watcher", ImmutableMap.<String, Object>of())))
.build();
}
private <T extends ResourceUpdate> DiscoveryRpcCall startResourceWatcher(
XdsResourceType<T> type, String name, ResourceWatcher<T> watcher) {