From bc65b77179548b6f21d57363a0fcf051932629a9 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Tue, 22 Apr 2025 18:42:34 -0700 Subject: [PATCH 1/2] xds: XdsDepManager should ignore updates after shutdown This prevents a NPE and subsequent channel panic when trying to build a config (because there are no watchers, so waitingOnResource==false) without any listener and route. ``` java.lang.NullPointerException: Cannot invoke "io.grpc.xds.XdsDependencyManager$RdsUpdateSupplier.getRdsUpdate()" because "routeSource" is null at io.grpc.xds.XdsDependencyManager.buildUpdate(XdsDependencyManager.java:295) at io.grpc.xds.XdsDependencyManager.maybePublishConfig(XdsDependencyManager.java:266) at io.grpc.xds.XdsDependencyManager$EdsWatcher.onChanged(XdsDependencyManager.java:899) at io.grpc.xds.XdsDependencyManager$EdsWatcher.onChanged(XdsDependencyManager.java:888) at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.notifyWatcher(XdsClientImpl.java:929) at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.lambda$onData$0(XdsClientImpl.java:837) at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:96) ``` I think this fully-fixes the problem today, but not tomorrow. subscribeToCluster() is racy as well, but not yet used. This was noticed when idleTimeout was firing, with some other code calling getState(true) to wake the channel back up. That may have made this panic more visible than it would be otherwise, but that has not been investigated. b/412474567 --- .../io/grpc/xds/XdsDependencyManager.java | 16 ++++++ .../io/grpc/xds/XdsDependencyManagerTest.java | 51 ++++++++++++++++++- 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java index 8cd3119727d..d804954ecf9 100644 --- a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -199,6 +199,7 @@ private void shutdownWatchersForType(TypeWatchers for (Map.Entry> watcherEntry : watchers.watchers.entrySet()) { xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(), watcherEntry.getValue()); + watcherEntry.getValue().cancelled = true; } } @@ -591,6 +592,9 @@ private XdsWatcherBase(XdsResourceType type, String resourceName) { @Override public void onError(Status error) { checkNotNull(error, "error"); + if (cancelled) { + return; + } // Don't update configuration on error, if we've already received configuration if (!hasDataValue()) { setDataAsStatus(Status.UNAVAILABLE.withDescription( @@ -659,6 +663,9 @@ private LdsWatcher(String resourceName) { @Override public void onChanged(XdsListenerResource.LdsUpdate update) { checkNotNull(update, "update"); + if (cancelled) { + return; + } HttpConnectionManager httpConnectionManager = update.httpConnectionManager(); List virtualHosts; @@ -787,6 +794,9 @@ public RdsWatcher(String resourceName) { @Override public void onChanged(RdsUpdate update) { checkNotNull(update, "update"); + if (cancelled) { + return; + } List oldVirtualHosts = hasDataValue() ? getData().getValue().virtualHosts : Collections.emptyList(); @@ -815,6 +825,9 @@ private class CdsWatcher extends XdsWatcherBase { @Override public void onChanged(XdsClusterResource.CdsUpdate update) { checkNotNull(update, "update"); + if (cancelled) { + return; + } switch (update.clusterType()) { case EDS: setData(update); @@ -895,6 +908,9 @@ private EdsWatcher(String resourceName, CdsWatcher parentContext) { @Override public void onChanged(XdsEndpointResource.EdsUpdate update) { + if (cancelled) { + return; + } setData(checkNotNull(update, "update")); maybePublishConfig(); } diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java index 2af04a3aedf..815cb06e3cd 100644 --- a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java @@ -41,6 +41,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Message; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; @@ -65,7 +66,7 @@ import io.grpc.xds.XdsConfig.XdsClusterConfig; import io.grpc.xds.XdsEndpointResource.EdsUpdate; import io.grpc.xds.client.CommonBootstrapperTestUtils; -import io.grpc.xds.client.XdsClientImpl; +import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClientMetricReporter; import io.grpc.xds.client.XdsTransportFactory; import java.io.Closeable; @@ -115,7 +116,7 @@ public class XdsDependencyManagerTest { }); private ManagedChannel channel; - private XdsClientImpl xdsClient; + private XdsClient xdsClient; private XdsDependencyManager xdsDependencyManager; private TestWatcher xdsConfigWatcher; private Server xdsServer; @@ -715,6 +716,52 @@ public void testCdsError() throws IOException { assertThat(status.getDescription()).contains(XdsTestUtils.CLUSTER_NAME); } + @Test + public void updatesAfterShutdown() { + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + @SuppressWarnings("unchecked") + XdsClient.ResourceWatcher serverNameWatcher = + mock(XdsClient.ResourceWatcher.class); + xdsClient.watchXdsResource( + XdsListenerResource.getInstance(), + serverName + "2", + serverNameWatcher, + MoreExecutors.directExecutor()); + + syncContext.execute(() -> { + // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this + // Runnable returns + xdsDependencyManager.shutdown(); + + // Cause an onChanged() for each type, and maybe onResourceDoesNotExist(), going from EDS up + // the tree since updates won't be processed immediately by the dependency manager (we're + // blocking the sync context) + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT); + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS2", + ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT); + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS2", "EDS2", + ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT); + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS2", "CDS2", "EDS2", + ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT); + XdsTestUtils.setAdsConfig(controlPlaneService, serverName + "2", "RDS2", "CDS2", "EDS2", + ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT); + }); + + // Wait for the prior updates to be processed by XdsClient. This can't be done in the + // syncContext as flow control prevents further updates until previous callbacks have completed. + verify(serverNameWatcher, timeout(5000)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsListenerResource.getInstance(), serverName + "2", serverNameWatcher); + } + private Listener buildInlineClientListener(String rdsName, String clusterName) { return XdsTestUtils.buildInlineClientListener(rdsName, clusterName, serverName); } From 7690eee91e186b726ce932d8d6e20b24bae86915 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Wed, 23 Apr 2025 07:02:21 -0700 Subject: [PATCH 2/2] Split up test Flow control prevents all but the first update from occurring before the XdsClient unsubscriptions take effect. --- .../io/grpc/xds/XdsDependencyManagerTest.java | 128 +++++++++++++++--- 1 file changed, 107 insertions(+), 21 deletions(-) diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java index 815cb06e3cd..1f3d8511ecc 100644 --- a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java @@ -717,7 +717,7 @@ public void testCdsError() throws IOException { } @Test - public void updatesAfterShutdown() { + public void ldsUpdateAfterShutdown() { XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", ENDPOINT_HOSTNAME, ENDPOINT_PORT); @@ -727,39 +727,125 @@ public void updatesAfterShutdown() { verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); @SuppressWarnings("unchecked") - XdsClient.ResourceWatcher serverNameWatcher = + XdsClient.ResourceWatcher resourceWatcher = mock(XdsClient.ResourceWatcher.class); xdsClient.watchXdsResource( XdsListenerResource.getInstance(), - serverName + "2", - serverNameWatcher, + serverName, + resourceWatcher, MoreExecutors.directExecutor()); + verify(resourceWatcher, timeout(5000)).onChanged(any()); + + syncContext.execute(() -> { + // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this + // Runnable returns + xdsDependencyManager.shutdown(); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS2", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + verify(resourceWatcher, timeout(5000).times(2)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsListenerResource.getInstance(), serverName, resourceWatcher); + }); + } + + @Test + public void rdsUpdateAfterShutdown() { + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + @SuppressWarnings("unchecked") + XdsClient.ResourceWatcher resourceWatcher = + mock(XdsClient.ResourceWatcher.class); + xdsClient.watchXdsResource( + XdsRouteConfigureResource.getInstance(), + "RDS", + resourceWatcher, + MoreExecutors.directExecutor()); + verify(resourceWatcher, timeout(5000)).onChanged(any()); + + syncContext.execute(() -> { + // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this + // Runnable returns + xdsDependencyManager.shutdown(); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS2", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + verify(resourceWatcher, timeout(5000).times(2)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsRouteConfigureResource.getInstance(), serverName, resourceWatcher); + }); + } + + @Test + public void cdsUpdateAfterShutdown() { + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + @SuppressWarnings("unchecked") + XdsClient.ResourceWatcher resourceWatcher = + mock(XdsClient.ResourceWatcher.class); + xdsClient.watchXdsResource( + XdsClusterResource.getInstance(), + "CDS", + resourceWatcher, + MoreExecutors.directExecutor()); + verify(resourceWatcher, timeout(5000)).onChanged(any()); syncContext.execute(() -> { // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this // Runnable returns xdsDependencyManager.shutdown(); - // Cause an onChanged() for each type, and maybe onResourceDoesNotExist(), going from EDS up - // the tree since updates won't be processed immediately by the dependency manager (we're - // blocking the sync context) - XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", - ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT); XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS2", - ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT); - XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS2", "EDS2", - ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT); - XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS2", "CDS2", "EDS2", - ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT); - XdsTestUtils.setAdsConfig(controlPlaneService, serverName + "2", "RDS2", "CDS2", "EDS2", - ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT); + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + verify(resourceWatcher, timeout(5000).times(2)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsClusterResource.getInstance(), serverName, resourceWatcher); }); + } + + @Test + public void edsUpdateAfterShutdown() { + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); - // Wait for the prior updates to be processed by XdsClient. This can't be done in the - // syncContext as flow control prevents further updates until previous callbacks have completed. - verify(serverNameWatcher, timeout(5000)).onChanged(any()); - xdsClient.cancelXdsResourceWatch( - XdsListenerResource.getInstance(), serverName + "2", serverNameWatcher); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + @SuppressWarnings("unchecked") + XdsClient.ResourceWatcher resourceWatcher = + mock(XdsClient.ResourceWatcher.class); + xdsClient.watchXdsResource( + XdsEndpointResource.getInstance(), + "EDS", + resourceWatcher, + MoreExecutors.directExecutor()); + verify(resourceWatcher, timeout(5000)).onChanged(any()); + + syncContext.execute(() -> { + // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this + // Runnable returns + xdsDependencyManager.shutdown(); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT); + verify(resourceWatcher, timeout(5000).times(2)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsEndpointResource.getInstance(), serverName, resourceWatcher); + }); } private Listener buildInlineClientListener(String rdsName, String clusterName) {