Skip to content

Commit 342c6e1

Browse files
committed
xds: implement server feature fail_on_data_errors
1 parent f385add commit 342c6e1

9 files changed

Lines changed: 236 additions & 34 deletions

xds/src/main/java/io/grpc/xds/client/Bootstrapper.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,18 +65,22 @@ public abstract static class ServerInfo {
6565

6666
public abstract boolean resourceTimerIsTransientError();
6767

68+
public abstract boolean failOnDataErrors();
69+
6870
@VisibleForTesting
6971
public static ServerInfo create(String target, @Nullable Object implSpecificConfig) {
7072
return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig,
71-
false, false, false);
73+
false, false, false, false);
7274
}
7375

7476
@VisibleForTesting
7577
public static ServerInfo create(
76-
String target, Object implSpecificConfig, boolean ignoreResourceDeletion,
77-
boolean isTrustedXdsServer, boolean resourceTimerIsTransientError) {
78+
String target, Object implSpecificConfig,
79+
boolean ignoreResourceDeletion, boolean isTrustedXdsServer,
80+
boolean resourceTimerIsTransientError, boolean failOnDataErrors) {
7881
return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig,
79-
ignoreResourceDeletion, isTrustedXdsServer, resourceTimerIsTransientError);
82+
ignoreResourceDeletion, isTrustedXdsServer,
83+
resourceTimerIsTransientError, failOnDataErrors);
8084
}
8185
}
8286

xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public abstract class BootstrapperImpl extends Bootstrapper {
5858
private static final String SERVER_FEATURE_TRUSTED_XDS_SERVER = "trusted_xds_server";
5959
private static final String
6060
SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR = "resource_timer_is_transient_error";
61+
private static final String SERVER_FEATURE_FAIL_ON_DATA_ERRORS = "fail_on_data_errors";
6162

6263
@VisibleForTesting
6364
static boolean enableXdsFallback = GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_FALLBACK, true);
@@ -257,6 +258,7 @@ private List<ServerInfo> parseServerInfos(List<?> rawServerConfigs, XdsLogger lo
257258

258259
boolean resourceTimerIsTransientError = false;
259260
boolean ignoreResourceDeletion = false;
261+
boolean failOnDataErrors = false;
260262
// "For forward compatibility reasons, the client will ignore any entry in the list that it
261263
// does not understand, regardless of type."
262264
List<?> serverFeatures = JsonUtil.getList(serverConfig, "server_features");
@@ -267,12 +269,14 @@ private List<ServerInfo> parseServerInfos(List<?> rawServerConfigs, XdsLogger lo
267269
}
268270
resourceTimerIsTransientError = xdsDataErrorHandlingEnabled
269271
&& serverFeatures.contains(SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR);
272+
failOnDataErrors = xdsDataErrorHandlingEnabled
273+
&& serverFeatures.contains(SERVER_FEATURE_FAIL_ON_DATA_ERRORS);
270274
}
271275
servers.add(
272276
ServerInfo.create(serverUri, implSpecificConfig, ignoreResourceDeletion,
273277
serverFeatures != null
274278
&& serverFeatures.contains(SERVER_FEATURE_TRUSTED_XDS_SERVER),
275-
resourceTimerIsTransientError));
279+
resourceTimerIsTransientError, failOnDataErrors));
276280
}
277281
return servers.build();
278282
}

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -603,16 +603,22 @@ private <T extends ResourceUpdate> void handleResourceUpdate(
603603
}
604604

605605
if (invalidResources.contains(resourceName)) {
606-
// The resource update is invalid. Capture the error without notifying the watchers.
606+
// The resource update is invalid (NACK). Handle as a data error.
607607
subscriber.onRejected(args.versionInfo, updateTime, errorDetail);
608-
}
609-
610-
if (invalidResources.contains(resourceName)) {
611-
// The resource is missing. Reuse the cached resource if possible.
612-
if (subscriber.data == null) {
613-
// No cached data. Notify the watchers of an invalid update.
614-
subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail), processingTracker);
608+
609+
// Handle data errors (NACKs) based on fail_on_data_errors server feature.
610+
// When xdsDataErrorHandlingEnabled is true and fail_on_data_errors is present,
611+
// delete cached data so onError will call onResourceChanged instead of onAmbientError.
612+
// When xdsDataErrorHandlingEnabled is false, use old behavior (always keep cached data).
613+
boolean xdsDataErrorHandlingEnabled =
614+
io.grpc.xds.client.BootstrapperImpl.xdsDataErrorHandlingEnabled;
615+
if (xdsDataErrorHandlingEnabled && subscriber.data != null
616+
&& args.serverInfo.failOnDataErrors()) {
617+
subscriber.data = null;
615618
}
619+
// Call onError, which will decide whether to call onResourceChanged or onAmbientError
620+
// based on whether data exists after the above deletion.
621+
subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail), processingTracker);
616622
continue;
617623
}
618624

@@ -866,20 +872,43 @@ void onAbsent(@Nullable ProcessingTracker processingTracker, ServerInfo serverIn
866872
return;
867873
}
868874

869-
// Ignore deletion of State of the World resources when this feature is on,
870-
// and the resource is reusable.
875+
// Handle data errors (resource deletions) based on fail_on_data_errors server feature.
876+
// When xdsDataErrorHandlingEnabled is true and fail_on_data_errors is not present,
877+
// we treat deletions as ambient errors and keep using the cached resource.
878+
// When fail_on_data_errors is present, we delete the cached resource and fail.
879+
// When xdsDataErrorHandlingEnabled is false, use the old behavior (ignore_resource_deletion).
871880
boolean ignoreResourceDeletionEnabled = serverInfo.ignoreResourceDeletion();
872-
if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
873-
if (!resourceDeletionIgnored) {
874-
logger.log(XdsLogLevel.FORCE_WARNING,
875-
"xds server {0}: ignoring deletion for resource type {1} name {2}}",
876-
serverInfo.target(), type, resource);
877-
resourceDeletionIgnored = true;
881+
boolean failOnDataErrors = serverInfo.failOnDataErrors();
882+
boolean xdsDataErrorHandlingEnabled =
883+
io.grpc.xds.client.BootstrapperImpl.xdsDataErrorHandlingEnabled;
884+
885+
if (type.isFullStateOfTheWorld() && data != null) {
886+
// New behavior (per gRFC A88): Default is to treat deletions as ambient errors
887+
if (xdsDataErrorHandlingEnabled && !failOnDataErrors) {
888+
if (!resourceDeletionIgnored) {
889+
logger.log(XdsLogLevel.FORCE_WARNING,
890+
"xds server {0}: ignoring deletion for resource type {1} name {2}}",
891+
serverInfo.target(), type, resource);
892+
resourceDeletionIgnored = true;
893+
}
894+
Status deletionStatus = Status.NOT_FOUND.withDescription(
895+
"Resource " + resource + " deleted from server");
896+
onAmbientError(deletionStatus, processingTracker);
897+
return;
898+
}
899+
// Old behavior: Use ignore_resource_deletion server feature
900+
if (!xdsDataErrorHandlingEnabled && ignoreResourceDeletionEnabled) {
901+
if (!resourceDeletionIgnored) {
902+
logger.log(XdsLogLevel.FORCE_WARNING,
903+
"xds server {0}: ignoring deletion for resource type {1} name {2}}",
904+
serverInfo.target(), type, resource);
905+
resourceDeletionIgnored = true;
906+
}
907+
Status deletionStatus = Status.NOT_FOUND.withDescription(
908+
"Resource " + resource + " deleted from server");
909+
onAmbientError(deletionStatus, processingTracker);
910+
return;
878911
}
879-
Status deletionStatus = Status.NOT_FOUND.withDescription(
880-
"Resource " + resource + " deleted from server");
881-
onAmbientError(deletionStatus, processingTracker);
882-
return;
883912
}
884913

885914
logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);

xds/src/test/java/io/grpc/xds/GrpcBootstrapperImplTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,52 @@ public void serverFeatures_ignoresUnknownValues() throws XdsInitializationExcept
723723
assertThat(serverInfo.isTrustedXdsServer()).isTrue();
724724
}
725725

726+
@Test
727+
public void serverFeature_failOnDataErrors() throws XdsInitializationException {
728+
BootstrapperImpl.xdsDataErrorHandlingEnabled = true;
729+
String rawData = "{\n"
730+
+ " \"xds_servers\": [\n"
731+
+ " {\n"
732+
+ " \"server_uri\": \"" + SERVER_URI + "\",\n"
733+
+ " \"channel_creds\": [\n"
734+
+ " {\"type\": \"insecure\"}\n"
735+
+ " ],\n"
736+
+ " \"server_features\": [\"fail_on_data_errors\"]\n"
737+
+ " }\n"
738+
+ " ]\n"
739+
+ "}";
740+
741+
bootstrapper.setFileReader(createFileReader(BOOTSTRAP_FILE_PATH, rawData));
742+
BootstrapInfo info = bootstrapper.bootstrap();
743+
ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
744+
assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
745+
assertThat(serverInfo.implSpecificConfig()).isInstanceOf(InsecureChannelCredentials.class);
746+
assertThat(serverInfo.failOnDataErrors()).isTrue();
747+
BootstrapperImpl.xdsDataErrorHandlingEnabled = false;
748+
}
749+
750+
@Test
751+
public void serverFeature_failOnDataErrors_requiresEnvVar() throws XdsInitializationException {
752+
BootstrapperImpl.xdsDataErrorHandlingEnabled = false;
753+
String rawData = "{\n"
754+
+ " \"xds_servers\": [\n"
755+
+ " {\n"
756+
+ " \"server_uri\": \"" + SERVER_URI + "\",\n"
757+
+ " \"channel_creds\": [\n"
758+
+ " {\"type\": \"insecure\"}\n"
759+
+ " ],\n"
760+
+ " \"server_features\": [\"fail_on_data_errors\"]\n"
761+
+ " }\n"
762+
+ " ]\n"
763+
+ "}";
764+
765+
bootstrapper.setFileReader(createFileReader(BOOTSTRAP_FILE_PATH, rawData));
766+
BootstrapInfo info = bootstrapper.bootstrap();
767+
ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
768+
// Should be false when env var is not enabled
769+
assertThat(serverInfo.failOnDataErrors()).isFalse();
770+
}
771+
726772
@Test
727773
public void notFound() {
728774
bootstrapper.bootstrapPathFromEnvVar = null;

xds/src/test/java/io/grpc/xds/GrpcXdsClientImplDataTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3614,7 +3614,7 @@ private static Filter buildHttpConnectionManagerFilter(HttpFilter... httpFilters
36143614

36153615
private XdsResourceType.Args getXdsResourceTypeArgs(boolean isTrustedServer) {
36163616
return new XdsResourceType.Args(
3617-
ServerInfo.create("http://td", "", false, isTrustedServer, false), "1.0", null, null, null, null
3617+
ServerInfo.create("http://td", "", false, isTrustedServer, false, false), "1.0", null, null, null, null
36183618
);
36193619
}
36203620
}

xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java

Lines changed: 122 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ public void setUp() throws IOException {
362362
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
363363

364364
xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, ignoreResourceDeletion(),
365-
true, false);
365+
true, false, false);
366366
BootstrapInfo bootstrapInfo =
367367
Bootstrapper.BootstrapInfo.builder()
368368
.servers(Collections.singletonList(xdsServerInfo))
@@ -1448,6 +1448,66 @@ public void ldsResourceDeleted_ignoreResourceDeletion() {
14481448
verifyNoMoreInteractions(ldsResourceWatcher);
14491449
}
14501450

1451+
/**
1452+
* When fail_on_data_errors server feature is on, xDS client should delete the cached listener
1453+
* and fail RPCs when LDS resource is deleted.
1454+
*/
1455+
@Test
1456+
public void ldsResourceDeleted_failOnDataErrors() {
1457+
BootstrapperImpl.xdsDataErrorHandlingEnabled = true;
1458+
xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, false,
1459+
true, false, true);
1460+
BootstrapInfo bootstrapInfo =
1461+
Bootstrapper.BootstrapInfo.builder()
1462+
.servers(Collections.singletonList(xdsServerInfo))
1463+
.node(NODE)
1464+
.authorities(ImmutableMap.of(
1465+
"",
1466+
AuthorityInfo.create(
1467+
"xdstp:///envoy.config.listener.v3.Listener/%s",
1468+
ImmutableList.of(Bootstrapper.ServerInfo.create(
1469+
SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS)))))
1470+
.certProviders(ImmutableMap.of())
1471+
.build();
1472+
xdsClient = new XdsClientImpl(
1473+
xdsTransportFactory,
1474+
bootstrapInfo,
1475+
fakeClock.getScheduledExecutorService(),
1476+
backoffPolicyProvider,
1477+
fakeClock.getStopwatchSupplier(),
1478+
timeProvider,
1479+
MessagePrinter.INSTANCE,
1480+
new TlsContextManagerImpl(bootstrapInfo),
1481+
xdsClientMetricReporter);
1482+
1483+
InOrder inOrder = inOrder(ldsResourceWatcher);
1484+
DiscoveryRpcCall call = startResourceWatcher(XdsListenerResource.getInstance(), LDS_RESOURCE,
1485+
ldsResourceWatcher);
1486+
verifyResourceMetadataRequested(LDS, LDS_RESOURCE);
1487+
1488+
// Initial LDS response.
1489+
call.sendResponse(LDS, testListenerVhosts, VERSION_1, "0000");
1490+
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
1491+
inOrder.verify(ldsResourceWatcher).onResourceChanged(ldsUpdateCaptor.capture());
1492+
StatusOr<LdsUpdate> statusOrUpdate = ldsUpdateCaptor.getValue();
1493+
assertThat(statusOrUpdate.hasValue()).isTrue();
1494+
verifyGoldenListenerVhosts(statusOrUpdate.getValue());
1495+
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
1496+
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
1497+
1498+
// Empty LDS response deletes the listener and fails RPCs.
1499+
call.sendResponse(LDS, Collections.<Any>emptyList(), VERSION_2, "0001");
1500+
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_2, "0001", NODE);
1501+
inOrder.verify(ldsResourceWatcher).onResourceChanged(ldsUpdateCaptor.capture());
1502+
StatusOr<LdsUpdate> statusOrUpdate1 = ldsUpdateCaptor.getValue();
1503+
assertThat(statusOrUpdate1.hasValue()).isFalse();
1504+
assertThat(statusOrUpdate1.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
1505+
verifyResourceMetadataDoesNotExist(LDS, LDS_RESOURCE);
1506+
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
1507+
1508+
BootstrapperImpl.xdsDataErrorHandlingEnabled = false;
1509+
}
1510+
14511511
@Test
14521512
@SuppressWarnings("unchecked")
14531513
public void multipleLdsWatchers() {
@@ -2972,6 +3032,64 @@ public void cdsResourceDeleted_ignoreResourceDeletion() {
29723032
verifyNoMoreInteractions(ldsResourceWatcher);
29733033
}
29743034

3035+
/**
3036+
* When fail_on_data_errors server feature is on, xDS client should delete the cached cluster
3037+
* and fail RPCs when CDS resource is deleted.
3038+
*/
3039+
@Test
3040+
public void cdsResourceDeleted_failOnDataErrors() {
3041+
BootstrapperImpl.xdsDataErrorHandlingEnabled = true;
3042+
xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, false,
3043+
true, false, true);
3044+
BootstrapInfo bootstrapInfo =
3045+
Bootstrapper.BootstrapInfo.builder()
3046+
.servers(Collections.singletonList(xdsServerInfo))
3047+
.node(NODE)
3048+
.authorities(ImmutableMap.of(
3049+
"",
3050+
AuthorityInfo.create(
3051+
"xdstp:///envoy.config.listener.v3.Listener/%s",
3052+
ImmutableList.of(Bootstrapper.ServerInfo.create(
3053+
SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS)))))
3054+
.certProviders(ImmutableMap.of())
3055+
.build();
3056+
xdsClient = new XdsClientImpl(
3057+
xdsTransportFactory,
3058+
bootstrapInfo,
3059+
fakeClock.getScheduledExecutorService(),
3060+
backoffPolicyProvider,
3061+
fakeClock.getStopwatchSupplier(),
3062+
timeProvider,
3063+
MessagePrinter.INSTANCE,
3064+
new TlsContextManagerImpl(bootstrapInfo),
3065+
xdsClientMetricReporter);
3066+
3067+
DiscoveryRpcCall call = startResourceWatcher(XdsClusterResource.getInstance(), CDS_RESOURCE,
3068+
cdsResourceWatcher);
3069+
verifyResourceMetadataRequested(CDS, CDS_RESOURCE);
3070+
3071+
// Initial CDS response.
3072+
call.sendResponse(CDS, testClusterRoundRobin, VERSION_1, "0000");
3073+
call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE);
3074+
verify(cdsResourceWatcher).onResourceChanged(cdsUpdateCaptor.capture());
3075+
StatusOr<CdsUpdate> statusOrUpdate = cdsUpdateCaptor.getValue();
3076+
assertThat(statusOrUpdate.hasValue()).isTrue();
3077+
verifyGoldenClusterRoundRobin(statusOrUpdate.getValue());
3078+
verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
3079+
TIME_INCREMENT);
3080+
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
3081+
3082+
// Empty CDS response deletes the cluster and fails RPCs.
3083+
call.sendResponse(CDS, Collections.<Any>emptyList(), VERSION_2, "0001");
3084+
call.verifyRequest(CDS, CDS_RESOURCE, VERSION_2, "0001", NODE);
3085+
verify(cdsResourceWatcher).onResourceChanged(argThat(
3086+
arg -> !arg.hasValue() && arg.getStatus().getDescription().contains(CDS_RESOURCE)));
3087+
verifyResourceMetadataDoesNotExist(CDS, CDS_RESOURCE);
3088+
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
3089+
3090+
BootstrapperImpl.xdsDataErrorHandlingEnabled = false;
3091+
}
3092+
29753093
@Test
29763094
@SuppressWarnings("unchecked")
29773095
public void multipleCdsWatchers() {
@@ -3369,7 +3487,7 @@ public void flowControlAbsent() throws Exception {
33693487
public void resourceTimerIsTransientError_schedulesExtendedTimeout() {
33703488
BootstrapperImpl.xdsDataErrorHandlingEnabled = true;
33713489
ServerInfo serverInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS,
3372-
false, true, true);
3490+
false, true, true, false);
33733491
BootstrapInfo bootstrapInfo =
33743492
Bootstrapper.BootstrapInfo.builder()
33753493
.servers(Collections.singletonList(serverInfo))
@@ -3414,7 +3532,7 @@ public void resourceTimerIsTransientError_schedulesExtendedTimeout() {
34143532
public void resourceTimerIsTransientError_callsOnErrorUnavailable() {
34153533
BootstrapperImpl.xdsDataErrorHandlingEnabled = true;
34163534
xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, ignoreResourceDeletion(),
3417-
true, true);
3535+
true, true, false);
34183536
BootstrapInfo bootstrapInfo =
34193537
Bootstrapper.BootstrapInfo.builder()
34203538
.servers(Collections.singletonList(xdsServerInfo))
@@ -4644,7 +4762,7 @@ private XdsClientImpl createXdsClient(String serverUri) {
46444762
private BootstrapInfo buildBootStrap(String serverUri) {
46454763

46464764
ServerInfo xdsServerInfo = ServerInfo.create(serverUri, CHANNEL_CREDENTIALS,
4647-
ignoreResourceDeletion(), true, false);
4765+
ignoreResourceDeletion(), true, false, false);
46484766

46494767
return Bootstrapper.BootstrapInfo.builder()
46504768
.servers(Collections.singletonList(xdsServerInfo))

xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,13 +358,14 @@ public void resolving_targetAuthorityInAuthoritiesMap() {
358358
String serviceAuthority = "[::FFFF:129.144.52.38]:80";
359359
bootstrapInfo = BootstrapInfo.builder()
360360
.servers(ImmutableList.of(ServerInfo.create(
361-
"td.googleapis.com", InsecureChannelCredentials.create(), true, true, false)))
361+
"td.googleapis.com", InsecureChannelCredentials.create(), true, true, false, false)))
362362
.node(Node.newBuilder().build())
363363
.authorities(
364364
ImmutableMap.of(targetAuthority, AuthorityInfo.create(
365365
"xdstp://" + targetAuthority + "/envoy.config.listener.v3.Listener/%s?foo=1&bar=2",
366366
ImmutableList.of(ServerInfo.create(
367-
"td.googleapis.com", InsecureChannelCredentials.create(), true, true, false)))))
367+
"td.googleapis.com", InsecureChannelCredentials.create(),
368+
true, true, false, false)))))
368369
.build();
369370
expectedLdsResourceName = "xdstp://xds.authority.com/envoy.config.listener.v3.Listener/"
370371
+ "%5B::FFFF:129.144.52.38%5D:80?bar=2&foo=1"; // query param canonified

0 commit comments

Comments
 (0)