xds: apply valid resources while NACKing update (#8506)

Implementing [gRFC A46](https://github.com/grpc/proposal/pull/260)
This commit is contained in:
ZHANG Dapeng 2021-09-11 21:57:47 -07:00 committed by GitHub
parent 7ad7876e99
commit 7a65c74283
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 248 additions and 96 deletions

View File

@ -190,6 +190,7 @@ final class ClientXdsClient extends AbstractXdsClient {
protected void handleLdsResponse(String versionInfo, List<Any> resources, String nonce) {
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
List<String> errors = new ArrayList<>();
Set<String> retainedRdsResources = new HashSet<>();
@ -222,6 +223,7 @@ final class ClientXdsClient extends AbstractXdsClient {
} catch (ResourceInvalidException e) {
errors.add(
"LDS response Listener '" + listenerName + "' validation error: " + e.getMessage());
invalidResources.add(listenerName);
continue;
}
@ -231,19 +233,9 @@ final class ClientXdsClient extends AbstractXdsClient {
getLogger().log(XdsLogLevel.INFO,
"Received LDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);
if (!errors.isEmpty()) {
handleResourcesRejected(ResourceType.LDS, unpackedResources, versionInfo, nonce, errors);
return;
}
handleResourcesAccepted(ResourceType.LDS, parsedResources, versionInfo, nonce);
for (String resource : rdsResourceSubscribers.keySet()) {
if (!retainedRdsResources.contains(resource)) {
ResourceSubscriber subscriber = rdsResourceSubscribers.get(resource);
subscriber.onAbsent();
}
}
handleResourceUpdate(
ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources, versionInfo,
nonce, errors);
}
private LdsUpdate processClientSideListener(
@ -1313,6 +1305,7 @@ final class ClientXdsClient extends AbstractXdsClient {
protected void handleRdsResponse(String versionInfo, List<Any> resources, String nonce) {
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
List<String> errors = new ArrayList<>();
for (int i = 0; i < resources.size(); i++) {
@ -1340,6 +1333,7 @@ final class ClientXdsClient extends AbstractXdsClient {
errors.add(
"RDS response RouteConfiguration '" + routeConfigName + "' validation error: " + e
.getMessage());
invalidResources.add(routeConfigName);
continue;
}
@ -1348,12 +1342,9 @@ final class ClientXdsClient extends AbstractXdsClient {
getLogger().log(XdsLogLevel.INFO,
"Received RDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);
if (!errors.isEmpty()) {
handleResourcesRejected(ResourceType.RDS, unpackedResources, versionInfo, nonce, errors);
} else {
handleResourcesAccepted(ResourceType.RDS, parsedResources, versionInfo, nonce);
}
handleResourceUpdate(
ResourceType.RDS, parsedResources, invalidResources, Collections.<String>emptySet(),
versionInfo, nonce, errors);
}
private static RdsUpdate processRouteConfiguration(
@ -1377,6 +1368,7 @@ final class ClientXdsClient extends AbstractXdsClient {
protected void handleCdsResponse(String versionInfo, List<Any> resources, String nonce) {
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
List<String> errors = new ArrayList<>();
Set<String> retainedEdsResources = new HashSet<>();
@ -1413,6 +1405,7 @@ final class ClientXdsClient extends AbstractXdsClient {
} catch (ResourceInvalidException e) {
errors.add(
"CDS response Cluster '" + clusterName + "' validation error: " + e.getMessage());
invalidResources.add(clusterName);
continue;
}
parsedResources.put(clusterName, new ParsedResource(cdsUpdate, resource));
@ -1420,21 +1413,9 @@ final class ClientXdsClient extends AbstractXdsClient {
getLogger().log(XdsLogLevel.INFO,
"Received CDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);
if (!errors.isEmpty()) {
handleResourcesRejected(ResourceType.CDS, unpackedResources, versionInfo, nonce, errors);
return;
}
handleResourcesAccepted(ResourceType.CDS, parsedResources, versionInfo, nonce);
// CDS responses represents the state of the world, EDS resources not referenced in CDS
// resources should be deleted.
for (String resource : edsResourceSubscribers.keySet()) {
ResourceSubscriber subscriber = edsResourceSubscribers.get(resource);
if (!retainedEdsResources.contains(resource)) {
subscriber.onAbsent();
}
}
handleResourceUpdate(
ResourceType.CDS, parsedResources, invalidResources, retainedEdsResources, versionInfo,
nonce, errors);
}
@VisibleForTesting
@ -1615,6 +1596,7 @@ final class ClientXdsClient extends AbstractXdsClient {
protected void handleEdsResponse(String versionInfo, List<Any> resources, String nonce) {
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
List<String> errors = new ArrayList<>();
for (int i = 0; i < resources.size(); i++) {
@ -1649,16 +1631,17 @@ final class ClientXdsClient extends AbstractXdsClient {
} catch (ResourceInvalidException e) {
errors.add("EDS response ClusterLoadAssignment '" + clusterName
+ "' validation error: " + e.getMessage());
invalidResources.add(clusterName);
continue;
}
parsedResources.put(clusterName, new ParsedResource(edsUpdate, resource));
}
if (!errors.isEmpty()) {
handleResourcesRejected(ResourceType.EDS, unpackedResources, versionInfo, nonce, errors);
} else {
handleResourcesAccepted(ResourceType.EDS, parsedResources, versionInfo, nonce);
}
getLogger().log(
XdsLogLevel.INFO, "Received EDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);
handleResourceUpdate(
ResourceType.EDS, parsedResources, invalidResources, Collections.<String>emptySet(),
versionInfo, nonce, errors);
}
private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment)
@ -2048,43 +2031,67 @@ final class ClientXdsClient extends AbstractXdsClient {
}
}
private void handleResourcesAccepted(
ResourceType type, Map<String, ParsedResource> parsedResources, String version,
String nonce) {
ackResponse(type, version, nonce);
private void handleResourceUpdate(
ResourceType type, Map<String, ParsedResource> parsedResources, Set<String> invalidResources,
Set<String> retainedResources, String version, String nonce, List<String> errors) {
String errorDetail = null;
if (errors.isEmpty()) {
checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
ackResponse(type, version, nonce);
} else {
errorDetail = Joiner.on('\n').join(errors);
getLogger().log(XdsLogLevel.WARNING,
"Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
type, version, nonce, errorDetail);
nackResponse(type, nonce, errorDetail);
}
long updateTime = timeProvider.currentTimeNanos();
for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) {
String resourceName = entry.getKey();
ResourceSubscriber subscriber = entry.getValue();
// Attach error details to the subscribed resources that included in the ADS update.
if (invalidResources.contains(resourceName)) {
subscriber.onRejected(version, updateTime, errorDetail);
}
// Notify the watchers.
if (parsedResources.containsKey(resourceName)) {
subscriber.onData(parsedResources.get(resourceName), version, updateTime);
} else if (type == ResourceType.LDS || type == ResourceType.CDS) {
if (subscriber.data != null && invalidResources.contains(resourceName)) {
// Update is rejected but keep using the cached data.
if (type == ResourceType.LDS) {
LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data;
io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
if (hcm != null) {
String rdsName = hcm.rdsName();
if (rdsName != null) {
retainedResources.add(rdsName);
}
}
} else {
CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data;
String edsName = cdsUpdate.edsServiceName();
if (edsName == null) {
edsName = cdsUpdate.clusterName();
}
retainedResources.add(edsName);
}
continue;
}
// For State of the World services, notify watchers when their watched resource is missing
// from the ADS update.
subscriber.onAbsent();
}
}
}
private void handleResourcesRejected(
ResourceType type, Set<String> unpackedResourceNames, String version,
String nonce, List<String> errors) {
String errorDetail = Joiner.on('\n').join(errors);
getLogger().log(XdsLogLevel.WARNING,
"Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
type, version, nonce, errorDetail);
nackResponse(type, nonce, errorDetail);
long updateTime = timeProvider.currentTimeNanos();
for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) {
String resourceName = entry.getKey();
ResourceSubscriber subscriber = entry.getValue();
// Attach error details to the subscribed resources that included in the ADS update.
if (unpackedResourceNames.contains(resourceName)) {
subscriber.onRejected(version, updateTime, errorDetail);
// LDS/CDS responses represents the state of the world, RDS/EDS resources not referenced in
// LDS/CDS resources should be deleted.
if (type == ResourceType.LDS || type == ResourceType.CDS) {
Map<String, ResourceSubscriber> dependentSubscribers =
type == ResourceType.LDS ? rdsResourceSubscribers : edsResourceSubscribers;
for (String resource : dependentSubscribers.keySet()) {
if (!retainedResources.contains(resource)) {
dependentSubscribers.get(resource).onAbsent();
}
}
}
}

View File

@ -473,11 +473,11 @@ public abstract class ClientXdsClientTestBase {
List<String> errors = ImmutableList.of(
"LDS response Resource index 0 - can't decode Listener: ",
"LDS response Resource index 2 - can't decode Listener: ");
verifyResourceMetadataNacked(LDS, LDS_RESOURCE, null, "", 0, VERSION_1, TIME_INCREMENT, errors);
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerRds, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
// The response is NACKed with the same error message.
call.verifyRequestNack(LDS, LDS_RESOURCE, "", "0000", NODE, errors);
verifyNoInteractions(ldsResourceWatcher);
verify(ldsResourceWatcher).onChanged(any(LdsUpdate.class));
}
/**
@ -517,14 +517,14 @@ public abstract class ClientXdsClientTestBase {
"A", Any.pack(mf.buildListenerWithApiListenerForRds("A", "A.2")),
"B", Any.pack(mf.buildListenerWithApiListenerInvalid("B")));
call.sendResponse(LDS, resourcesV2.values().asList(), VERSION_2, "0001");
// {A, B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B
// {C} -> ACK, version 1
// {A} -> ACK, version 2
// {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B
// {C} -> does not exist
List<String> errorsV2 = ImmutableList.of("LDS response Listener 'B' validation error: ");
verifyResourceMetadataNacked(LDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT,
VERSION_2, TIME_INCREMENT * 2, errorsV2);
verifyResourceMetadataAcked(LDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2);
verifyResourceMetadataNacked(LDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT,
VERSION_2, TIME_INCREMENT * 2, errorsV2);
verifyResourceMetadataAcked(LDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT);
verifyResourceMetadataDoesNotExist(LDS, "C");
call.verifyRequestNack(LDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2);
// LDS -> {B, C} version 3
@ -532,7 +532,7 @@ public abstract class ClientXdsClientTestBase {
"B", Any.pack(mf.buildListenerWithApiListenerForRds("B", "B.3")),
"C", Any.pack(mf.buildListenerWithApiListenerForRds("C", "C.3")));
call.sendResponse(LDS, resourcesV3.values().asList(), VERSION_3, "0002");
// {A} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B
// {A} -> does not exist
// {B, C} -> ACK, version 3
verifyResourceMetadataDoesNotExist(LDS, "A");
verifyResourceMetadataAcked(LDS, "B", resourcesV3.get("B"), VERSION_3, TIME_INCREMENT * 3);
@ -541,6 +541,73 @@ public abstract class ClientXdsClientTestBase {
verifySubscribedResourcesMetadataSizes(3, 0, 0, 0);
}
@Test
public void ldsResponseErrorHandling_subscribedResourceInvalid_withRdsSubscriptioin() {
List<String> subscribedResourceNames = ImmutableList.of("A", "B", "C");
xdsClient.watchLdsResource("A", ldsResourceWatcher);
xdsClient.watchRdsResource("A.1", rdsResourceWatcher);
xdsClient.watchLdsResource("B", ldsResourceWatcher);
xdsClient.watchRdsResource("B.1", rdsResourceWatcher);
xdsClient.watchLdsResource("C", ldsResourceWatcher);
xdsClient.watchRdsResource("C.1", rdsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
assertThat(call).isNotNull();
verifyResourceMetadataRequested(LDS, "A");
verifyResourceMetadataRequested(LDS, "B");
verifyResourceMetadataRequested(LDS, "C");
verifyResourceMetadataRequested(RDS, "A.1");
verifyResourceMetadataRequested(RDS, "B.1");
verifyResourceMetadataRequested(RDS, "C.1");
verifySubscribedResourcesMetadataSizes(3, 0, 3, 0);
// LDS -> {A, B, C}, version 1
ImmutableMap<String, Any> resourcesV1 = ImmutableMap.of(
"A", Any.pack(mf.buildListenerWithApiListenerForRds("A", "A.1")),
"B", Any.pack(mf.buildListenerWithApiListenerForRds("B", "B.1")),
"C", Any.pack(mf.buildListenerWithApiListenerForRds("C", "C.1")));
call.sendResponse(LDS, resourcesV1.values().asList(), VERSION_1, "0000");
// {A, B, C} -> ACK, version 1
verifyResourceMetadataAcked(LDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT);
verifyResourceMetadataAcked(LDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT);
verifyResourceMetadataAcked(LDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT);
call.verifyRequest(LDS, subscribedResourceNames, VERSION_1, "0000", NODE);
// RDS -> {A.1, B.1, C.1}, version 1
List<Message> vhostsV1 = mf.buildOpaqueVirtualHosts(1);
ImmutableMap<String, Any> resourcesV11 = ImmutableMap.of(
"A.1", Any.pack(mf.buildRouteConfiguration("A.1", vhostsV1)),
"B.1", Any.pack(mf.buildRouteConfiguration("B.1", vhostsV1)),
"C.1", Any.pack(mf.buildRouteConfiguration("C.1", vhostsV1)));
call.sendResponse(RDS, resourcesV11.values().asList(), VERSION_1, "0000");
// {A.1, B.1, C.1} -> ACK, version 1
verifyResourceMetadataAcked(RDS, "A.1", resourcesV11.get("A.1"), VERSION_1, TIME_INCREMENT * 2);
verifyResourceMetadataAcked(RDS, "B.1", resourcesV11.get("B.1"), VERSION_1, TIME_INCREMENT * 2);
verifyResourceMetadataAcked(RDS, "C.1", resourcesV11.get("C.1"), VERSION_1, TIME_INCREMENT * 2);
// LDS -> {A, B}, version 2
// Failed to parse endpoint B
ImmutableMap<String, Any> resourcesV2 = ImmutableMap.of(
"A", Any.pack(mf.buildListenerWithApiListenerForRds("A", "A.2")),
"B", Any.pack(mf.buildListenerWithApiListenerInvalid("B")));
call.sendResponse(LDS, resourcesV2.values().asList(), VERSION_2, "0001");
// {A} -> ACK, version 2
// {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B
// {C} -> does not exist
List<String> errorsV2 = ImmutableList.of("LDS response Listener 'B' validation error: ");
verifyResourceMetadataAcked(LDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 3);
verifyResourceMetadataNacked(
LDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, VERSION_2, TIME_INCREMENT * 3,
errorsV2);
verifyResourceMetadataDoesNotExist(LDS, "C");
call.verifyRequestNack(LDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2);
// {A.1} -> does not exist
// {B.1} -> version 1
// {C.1} -> does not exist
verifyResourceMetadataDoesNotExist(RDS, "A.1");
verifyResourceMetadataAcked(RDS, "B.1", resourcesV11.get("B.1"), VERSION_1, TIME_INCREMENT * 2);
verifyResourceMetadataDoesNotExist(RDS, "C.1");
}
@Test
public void ldsResourceFound_containsVirtualHosts() {
DiscoveryRpcCall call = startResourceWatcher(LDS, LDS_RESOURCE, ldsResourceWatcher);
@ -807,11 +874,11 @@ public abstract class ClientXdsClientTestBase {
List<String> errors = ImmutableList.of(
"RDS response Resource index 0 - can't decode RouteConfiguration: ",
"RDS response Resource index 2 - can't decode RouteConfiguration: ");
verifyResourceMetadataNacked(RDS, RDS_RESOURCE, null, "", 0, VERSION_1, TIME_INCREMENT, errors);
verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 0, 1, 0);
// The response is NACKed with the same error message.
call.verifyRequestNack(RDS, RDS_RESOURCE, "", "0000", NODE, errors);
verifyNoInteractions(rdsResourceWatcher);
verify(rdsResourceWatcher).onChanged(any(RdsUpdate.class));
}
/**
@ -852,12 +919,12 @@ public abstract class ClientXdsClientTestBase {
"A", Any.pack(mf.buildRouteConfiguration("A", mf.buildOpaqueVirtualHosts(2))),
"B", Any.pack(mf.buildRouteConfigurationInvalid("B")));
call.sendResponse(RDS, resourcesV2.values().asList(), VERSION_2, "0001");
// {A, B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B
// {A} -> ACK, version 2
// {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B
// {C} -> ACK, version 1
List<String> errorsV2 =
ImmutableList.of("RDS response RouteConfiguration 'B' validation error: ");
verifyResourceMetadataNacked(RDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT,
VERSION_2, TIME_INCREMENT * 2, errorsV2);
verifyResourceMetadataAcked(RDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2);
verifyResourceMetadataNacked(RDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT,
VERSION_2, TIME_INCREMENT * 2, errorsV2);
verifyResourceMetadataAcked(RDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT);
@ -869,10 +936,9 @@ public abstract class ClientXdsClientTestBase {
"B", Any.pack(mf.buildRouteConfiguration("B", vhostsV3)),
"C", Any.pack(mf.buildRouteConfiguration("C", vhostsV3)));
call.sendResponse(RDS, resourcesV3.values().asList(), VERSION_3, "0002");
// {A} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B
// {A} -> ACK, version 2
// {B, C} -> ACK, version 3
verifyResourceMetadataNacked(RDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT,
VERSION_2, TIME_INCREMENT * 2, errorsV2);
verifyResourceMetadataAcked(RDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2);
verifyResourceMetadataAcked(RDS, "B", resourcesV3.get("B"), VERSION_3, TIME_INCREMENT * 3);
verifyResourceMetadataAcked(RDS, "C", resourcesV3.get("C"), VERSION_3, TIME_INCREMENT * 3);
call.verifyRequest(RDS, subscribedResourceNames, VERSION_3, "0002", NODE);
@ -1146,11 +1212,12 @@ public abstract class ClientXdsClientTestBase {
List<String> errors = ImmutableList.of(
"CDS response Resource index 0 - can't decode Cluster: ",
"CDS response Resource index 2 - can't decode Cluster: ");
verifyResourceMetadataNacked(CDS, CDS_RESOURCE, null, "", 0, VERSION_1, TIME_INCREMENT, errors);
verifyResourceMetadataAcked(
CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
// The response is NACKed with the same error message.
call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, errors);
verifyNoInteractions(cdsResourceWatcher);
verify(cdsResourceWatcher).onChanged(any(CdsUpdate.class));
}
/**
@ -1198,14 +1265,14 @@ public abstract class ClientXdsClientTestBase {
)),
"B", Any.pack(mf.buildClusterInvalid("B")));
call.sendResponse(CDS, resourcesV2.values().asList(), VERSION_2, "0001");
// {A, B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B
// {C} -> ACK, version 1
// {A} -> ACK, version 2
// {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B
// {C} -> does not exist
List<String> errorsV2 = ImmutableList.of("CDS response Cluster 'B' validation error: ");
verifyResourceMetadataNacked(CDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT,
VERSION_2, TIME_INCREMENT * 2, errorsV2);
verifyResourceMetadataAcked(CDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2);
verifyResourceMetadataNacked(CDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT,
VERSION_2, TIME_INCREMENT * 2, errorsV2);
verifyResourceMetadataAcked(CDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT);
verifyResourceMetadataDoesNotExist(CDS, "C");
call.verifyRequestNack(CDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2);
// CDS -> {B, C} version 3
@ -1217,7 +1284,7 @@ public abstract class ClientXdsClientTestBase {
"envoy.transport_sockets.tls", null
)));
call.sendResponse(CDS, resourcesV3.values().asList(), VERSION_3, "0002");
// {A} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B
// {A} -> does not exit
// {B, C} -> ACK, version 3
verifyResourceMetadataDoesNotExist(CDS, "A");
verifyResourceMetadataAcked(CDS, "B", resourcesV3.get("B"), VERSION_3, TIME_INCREMENT * 3);
@ -1225,6 +1292,82 @@ public abstract class ClientXdsClientTestBase {
call.verifyRequest(CDS, subscribedResourceNames, VERSION_3, "0002", NODE);
}
@Test
public void cdsResponseErrorHandling_subscribedResourceInvalid_withEdsSubscription() {
List<String> subscribedResourceNames = ImmutableList.of("A", "B", "C");
xdsClient.watchCdsResource("A", cdsResourceWatcher);
xdsClient.watchEdsResource("A.1", edsResourceWatcher);
xdsClient.watchCdsResource("B", cdsResourceWatcher);
xdsClient.watchEdsResource("B.1", edsResourceWatcher);
xdsClient.watchCdsResource("C", cdsResourceWatcher);
xdsClient.watchEdsResource("C.1", edsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
assertThat(call).isNotNull();
verifyResourceMetadataRequested(CDS, "A");
verifyResourceMetadataRequested(CDS, "B");
verifyResourceMetadataRequested(CDS, "C");
verifyResourceMetadataRequested(EDS, "A.1");
verifyResourceMetadataRequested(EDS, "B.1");
verifyResourceMetadataRequested(EDS, "C.1");
verifySubscribedResourcesMetadataSizes(0, 3, 0, 3);
// CDS -> {A, B, C}, version 1
ImmutableMap<String, Any> resourcesV1 = ImmutableMap.of(
"A", Any.pack(mf.buildEdsCluster("A", "A.1", "round_robin", null, false, null,
"envoy.transport_sockets.tls", null
)),
"B", Any.pack(mf.buildEdsCluster("B", "B.1", "round_robin", null, false, null,
"envoy.transport_sockets.tls", null
)),
"C", Any.pack(mf.buildEdsCluster("C", "C.1", "round_robin", null, false, null,
"envoy.transport_sockets.tls", null
)));
call.sendResponse(CDS, resourcesV1.values().asList(), VERSION_1, "0000");
// {A, B, C} -> ACK, version 1
verifyResourceMetadataAcked(CDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT);
verifyResourceMetadataAcked(CDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT);
verifyResourceMetadataAcked(CDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT);
call.verifyRequest(CDS, subscribedResourceNames, VERSION_1, "0000", NODE);
// EDS -> {A.1, B.1, C.1}, version 1
List<Message> dropOverloads = ImmutableList.of();
List<Message> endpointsV1 = ImmutableList.of(lbEndpointHealthy);
ImmutableMap<String, Any> resourcesV11 = ImmutableMap.of(
"A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV1, dropOverloads)),
"B.1", Any.pack(mf.buildClusterLoadAssignment("B.1", endpointsV1, dropOverloads)),
"C.1", Any.pack(mf.buildClusterLoadAssignment("C.1", endpointsV1, dropOverloads)));
call.sendResponse(EDS, resourcesV11.values().asList(), VERSION_1, "0000");
// {A.1, B.1, C.1} -> ACK, version 1
verifyResourceMetadataAcked(EDS, "A.1", resourcesV11.get("A.1"), VERSION_1, TIME_INCREMENT * 2);
verifyResourceMetadataAcked(EDS, "B.1", resourcesV11.get("B.1"), VERSION_1, TIME_INCREMENT * 2);
verifyResourceMetadataAcked(EDS, "C.1", resourcesV11.get("C.1"), VERSION_1, TIME_INCREMENT * 2);
// CDS -> {A, B}, version 2
// Failed to parse endpoint B
ImmutableMap<String, Any> resourcesV2 = ImmutableMap.of(
"A", Any.pack(mf.buildEdsCluster("A", "A.2", "round_robin", null, false, null,
"envoy.transport_sockets.tls", null
)),
"B", Any.pack(mf.buildClusterInvalid("B")));
call.sendResponse(CDS, resourcesV2.values().asList(), VERSION_2, "0001");
// {A} -> ACK, version 2
// {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B
// {C} -> does not exist
List<String> errorsV2 = ImmutableList.of("CDS response Cluster 'B' validation error: ");
verifyResourceMetadataAcked(CDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 3);
verifyResourceMetadataNacked(
CDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, VERSION_2, TIME_INCREMENT * 3,
errorsV2);
verifyResourceMetadataDoesNotExist(CDS, "C");
call.verifyRequestNack(CDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2);
// {A.1} -> does not exist
// {B.1} -> version 1
// {C.1} -> does not exist
verifyResourceMetadataDoesNotExist(EDS, "A.1");
verifyResourceMetadataAcked(EDS, "B.1", resourcesV11.get("B.1"), VERSION_1, TIME_INCREMENT * 2);
verifyResourceMetadataDoesNotExist(EDS, "C.1");
}
@Test
public void cdsResourceFound() {
DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher);
@ -1666,11 +1809,14 @@ public abstract class ClientXdsClientTestBase {
List<String> errors = ImmutableList.of(
"EDS response Resource index 0 - can't decode ClusterLoadAssignment: ",
"EDS response Resource index 2 - can't decode ClusterLoadAssignment: ");
verifyResourceMetadataNacked(EDS, EDS_RESOURCE, null, "", 0, VERSION_1, TIME_INCREMENT, errors);
verifyResourceMetadataAcked(
EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
// The response is NACKed with the same error message.
call.verifyRequestNack(EDS, EDS_RESOURCE, "", "0000", NODE, errors);
verifyNoInteractions(edsResourceWatcher);
verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture());
EdsUpdate edsUpdate = edsUpdateCaptor.getValue();
assertThat(edsUpdate.clusterName).isEqualTo(EDS_RESOURCE);
}
/**
@ -1713,12 +1859,12 @@ public abstract class ClientXdsClientTestBase {
"A", Any.pack(mf.buildClusterLoadAssignment("A", endpointsV2, dropOverloads)),
"B", Any.pack(mf.buildClusterLoadAssignmentInvalid("B")));
call.sendResponse(EDS, resourcesV2.values().asList(), VERSION_2, "0001");
// {A, B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B
// {A} -> ACK, version 2
// {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B
// {C} -> ACK, version 1
List<String> errorsV2 =
ImmutableList.of("EDS response ClusterLoadAssignment 'B' validation error: ");
verifyResourceMetadataNacked(EDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT,
VERSION_2, TIME_INCREMENT * 2, errorsV2);
verifyResourceMetadataAcked(EDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2);
verifyResourceMetadataNacked(EDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT,
VERSION_2, TIME_INCREMENT * 2, errorsV2);
verifyResourceMetadataAcked(EDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT);
@ -1731,10 +1877,9 @@ public abstract class ClientXdsClientTestBase {
"B", Any.pack(mf.buildClusterLoadAssignment("B", endpointsV3, dropOverloads)),
"C", Any.pack(mf.buildClusterLoadAssignment("C", endpointsV3, dropOverloads)));
call.sendResponse(EDS, resourcesV3.values().asList(), VERSION_3, "0002");
// {A} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B
// {A} -> ACK, version 2
// {B, C} -> ACK, version 3
verifyResourceMetadataNacked(EDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT,
VERSION_2, TIME_INCREMENT * 2, errorsV2);
verifyResourceMetadataAcked(EDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2);
verifyResourceMetadataAcked(EDS, "B", resourcesV3.get("B"), VERSION_3, TIME_INCREMENT * 3);
verifyResourceMetadataAcked(EDS, "C", resourcesV3.get("C"), VERSION_3, TIME_INCREMENT * 3);
call.verifyRequest(EDS, subscribedResourceNames, VERSION_3, "0002", NODE);