From fc81ff53a2d5ab99d875b5e929f055868fd3fd62 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Wed, 23 Apr 2025 09:18:08 -0700 Subject: [PATCH] xds: XdsDepManager should ignore updates after shutdown This prevents a NPE and subsequent channel panic when trying to build a config (because there are no watchers, so waitingOnResource==false) without any listener and route. ``` java.lang.NullPointerException: Cannot invoke "io.grpc.xds.XdsDependencyManager$RdsUpdateSupplier.getRdsUpdate()" because "routeSource" is null at io.grpc.xds.XdsDependencyManager.buildUpdate(XdsDependencyManager.java:295) at io.grpc.xds.XdsDependencyManager.maybePublishConfig(XdsDependencyManager.java:266) at io.grpc.xds.XdsDependencyManager$EdsWatcher.onChanged(XdsDependencyManager.java:899) at io.grpc.xds.XdsDependencyManager$EdsWatcher.onChanged(XdsDependencyManager.java:888) at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.notifyWatcher(XdsClientImpl.java:929) at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.lambda$onData$0(XdsClientImpl.java:837) at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:96) ``` I think this fully-fixes the problem today, but not tomorrow. subscribeToCluster() is racy as well, but not yet used. This was noticed when idleTimeout was firing, with some other code calling getState(true) to wake the channel back up. That may have made this panic more visible than it would be otherwise, but that has not been investigated. b/412474567 --- .../io/grpc/xds/XdsDependencyManager.java | 16 ++ .../io/grpc/xds/XdsDependencyManagerTest.java | 137 +++++++++++++++++- 2 files changed, 151 insertions(+), 2 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java index 8cd3119727d..d804954ecf9 100644 --- a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -199,6 +199,7 @@ private void shutdownWatchersForType(TypeWatchers for (Map.Entry> watcherEntry : watchers.watchers.entrySet()) { xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(), watcherEntry.getValue()); + watcherEntry.getValue().cancelled = true; } } @@ -591,6 +592,9 @@ private XdsWatcherBase(XdsResourceType type, String resourceName) { @Override public void onError(Status error) { checkNotNull(error, "error"); + if (cancelled) { + return; + } // Don't update configuration on error, if we've already received configuration if (!hasDataValue()) { setDataAsStatus(Status.UNAVAILABLE.withDescription( @@ -659,6 +663,9 @@ private LdsWatcher(String resourceName) { @Override public void onChanged(XdsListenerResource.LdsUpdate update) { checkNotNull(update, "update"); + if (cancelled) { + return; + } HttpConnectionManager httpConnectionManager = update.httpConnectionManager(); List virtualHosts; @@ -787,6 +794,9 @@ public RdsWatcher(String resourceName) { @Override public void onChanged(RdsUpdate update) { checkNotNull(update, "update"); + if (cancelled) { + return; + } List oldVirtualHosts = hasDataValue() ? getData().getValue().virtualHosts : Collections.emptyList(); @@ -815,6 +825,9 @@ private class CdsWatcher extends XdsWatcherBase { @Override public void onChanged(XdsClusterResource.CdsUpdate update) { checkNotNull(update, "update"); + if (cancelled) { + return; + } switch (update.clusterType()) { case EDS: setData(update); @@ -895,6 +908,9 @@ private EdsWatcher(String resourceName, CdsWatcher parentContext) { @Override public void onChanged(XdsEndpointResource.EdsUpdate update) { + if (cancelled) { + return; + } setData(checkNotNull(update, "update")); maybePublishConfig(); } diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java index 2af04a3aedf..1f3d8511ecc 100644 --- a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java @@ -41,6 +41,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Message; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; @@ -65,7 +66,7 @@ import io.grpc.xds.XdsConfig.XdsClusterConfig; import io.grpc.xds.XdsEndpointResource.EdsUpdate; import io.grpc.xds.client.CommonBootstrapperTestUtils; -import io.grpc.xds.client.XdsClientImpl; +import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClientMetricReporter; import io.grpc.xds.client.XdsTransportFactory; import java.io.Closeable; @@ -115,7 +116,7 @@ public class XdsDependencyManagerTest { }); private ManagedChannel channel; - private XdsClientImpl xdsClient; + private XdsClient xdsClient; private XdsDependencyManager xdsDependencyManager; private TestWatcher xdsConfigWatcher; private Server xdsServer; @@ -715,6 +716,138 @@ public void testCdsError() throws IOException { assertThat(status.getDescription()).contains(XdsTestUtils.CLUSTER_NAME); } + @Test + public void ldsUpdateAfterShutdown() { + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + @SuppressWarnings("unchecked") + XdsClient.ResourceWatcher resourceWatcher = + mock(XdsClient.ResourceWatcher.class); + xdsClient.watchXdsResource( + XdsListenerResource.getInstance(), + serverName, + resourceWatcher, + MoreExecutors.directExecutor()); + verify(resourceWatcher, timeout(5000)).onChanged(any()); + + syncContext.execute(() -> { + // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this + // Runnable returns + xdsDependencyManager.shutdown(); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS2", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + verify(resourceWatcher, timeout(5000).times(2)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsListenerResource.getInstance(), serverName, resourceWatcher); + }); + } + + @Test + public void rdsUpdateAfterShutdown() { + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + @SuppressWarnings("unchecked") + XdsClient.ResourceWatcher resourceWatcher = + mock(XdsClient.ResourceWatcher.class); + xdsClient.watchXdsResource( + XdsRouteConfigureResource.getInstance(), + "RDS", + resourceWatcher, + MoreExecutors.directExecutor()); + verify(resourceWatcher, timeout(5000)).onChanged(any()); + + syncContext.execute(() -> { + // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this + // Runnable returns + xdsDependencyManager.shutdown(); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS2", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + verify(resourceWatcher, timeout(5000).times(2)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsRouteConfigureResource.getInstance(), serverName, resourceWatcher); + }); + } + + @Test + public void cdsUpdateAfterShutdown() { + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + @SuppressWarnings("unchecked") + XdsClient.ResourceWatcher resourceWatcher = + mock(XdsClient.ResourceWatcher.class); + xdsClient.watchXdsResource( + XdsClusterResource.getInstance(), + "CDS", + resourceWatcher, + MoreExecutors.directExecutor()); + verify(resourceWatcher, timeout(5000)).onChanged(any()); + + syncContext.execute(() -> { + // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this + // Runnable returns + xdsDependencyManager.shutdown(); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS2", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + verify(resourceWatcher, timeout(5000).times(2)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsClusterResource.getInstance(), serverName, resourceWatcher); + }); + } + + @Test + public void edsUpdateAfterShutdown() { + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + @SuppressWarnings("unchecked") + XdsClient.ResourceWatcher resourceWatcher = + mock(XdsClient.ResourceWatcher.class); + xdsClient.watchXdsResource( + XdsEndpointResource.getInstance(), + "EDS", + resourceWatcher, + MoreExecutors.directExecutor()); + verify(resourceWatcher, timeout(5000)).onChanged(any()); + + syncContext.execute(() -> { + // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this + // Runnable returns + xdsDependencyManager.shutdown(); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT); + verify(resourceWatcher, timeout(5000).times(2)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsEndpointResource.getInstance(), serverName, resourceWatcher); + }); + } + private Listener buildInlineClientListener(String rdsName, String clusterName) { return XdsTestUtils.buildInlineClientListener(rdsName, clusterName, serverName); }