Skip to content

Commit bc65b77

Browse files
committed
xds: XdsDepManager should ignore updates after shutdown
This prevents a NPE and subsequent channel panic when trying to build a config (because there are no watchers, so waitingOnResource==false) without any listener and route. ``` java.lang.NullPointerException: Cannot invoke "io.grpc.xds.XdsDependencyManager$RdsUpdateSupplier.getRdsUpdate()" because "routeSource" is null at io.grpc.xds.XdsDependencyManager.buildUpdate(XdsDependencyManager.java:295) at io.grpc.xds.XdsDependencyManager.maybePublishConfig(XdsDependencyManager.java:266) at io.grpc.xds.XdsDependencyManager$EdsWatcher.onChanged(XdsDependencyManager.java:899) at io.grpc.xds.XdsDependencyManager$EdsWatcher.onChanged(XdsDependencyManager.java:888) at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.notifyWatcher(XdsClientImpl.java:929) at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.lambda$onData$0(XdsClientImpl.java:837) at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:96) ``` I think this fully-fixes the problem today, but not tomorrow. subscribeToCluster() is racy as well, but not yet used. This was noticed when idleTimeout was firing, with some other code calling getState(true) to wake the channel back up. That may have made this panic more visible than it would be otherwise, but that has not been investigated. b/412474567
1 parent 6cd007d commit bc65b77

2 files changed

Lines changed: 65 additions & 2 deletions

File tree

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T>
199199
for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
200200
xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
201201
watcherEntry.getValue());
202+
watcherEntry.getValue().cancelled = true;
202203
}
203204
}
204205

@@ -591,6 +592,9 @@ private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
591592
@Override
592593
public void onError(Status error) {
593594
checkNotNull(error, "error");
595+
if (cancelled) {
596+
return;
597+
}
594598
// Don't update configuration on error, if we've already received configuration
595599
if (!hasDataValue()) {
596600
setDataAsStatus(Status.UNAVAILABLE.withDescription(
@@ -659,6 +663,9 @@ private LdsWatcher(String resourceName) {
659663
@Override
660664
public void onChanged(XdsListenerResource.LdsUpdate update) {
661665
checkNotNull(update, "update");
666+
if (cancelled) {
667+
return;
668+
}
662669

663670
HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
664671
List<VirtualHost> virtualHosts;
@@ -787,6 +794,9 @@ public RdsWatcher(String resourceName) {
787794
@Override
788795
public void onChanged(RdsUpdate update) {
789796
checkNotNull(update, "update");
797+
if (cancelled) {
798+
return;
799+
}
790800
List<VirtualHost> oldVirtualHosts = hasDataValue()
791801
? getData().getValue().virtualHosts
792802
: Collections.emptyList();
@@ -815,6 +825,9 @@ private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
815825
@Override
816826
public void onChanged(XdsClusterResource.CdsUpdate update) {
817827
checkNotNull(update, "update");
828+
if (cancelled) {
829+
return;
830+
}
818831
switch (update.clusterType()) {
819832
case EDS:
820833
setData(update);
@@ -895,6 +908,9 @@ private EdsWatcher(String resourceName, CdsWatcher parentContext) {
895908

896909
@Override
897910
public void onChanged(XdsEndpointResource.EdsUpdate update) {
911+
if (cancelled) {
912+
return;
913+
}
898914
setData(checkNotNull(update, "update"));
899915
maybePublishConfig();
900916
}

xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
import com.google.common.collect.ImmutableMap;
4343
import com.google.common.collect.ImmutableSet;
44+
import com.google.common.util.concurrent.MoreExecutors;
4445
import com.google.protobuf.Message;
4546
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
4647
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
@@ -65,7 +66,7 @@
6566
import io.grpc.xds.XdsConfig.XdsClusterConfig;
6667
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
6768
import io.grpc.xds.client.CommonBootstrapperTestUtils;
68-
import io.grpc.xds.client.XdsClientImpl;
69+
import io.grpc.xds.client.XdsClient;
6970
import io.grpc.xds.client.XdsClientMetricReporter;
7071
import io.grpc.xds.client.XdsTransportFactory;
7172
import java.io.Closeable;
@@ -115,7 +116,7 @@ public class XdsDependencyManagerTest {
115116
});
116117

117118
private ManagedChannel channel;
118-
private XdsClientImpl xdsClient;
119+
private XdsClient xdsClient;
119120
private XdsDependencyManager xdsDependencyManager;
120121
private TestWatcher xdsConfigWatcher;
121122
private Server xdsServer;
@@ -715,6 +716,52 @@ public void testCdsError() throws IOException {
715716
assertThat(status.getDescription()).contains(XdsTestUtils.CLUSTER_NAME);
716717
}
717718

719+
@Test
720+
public void updatesAfterShutdown() {
721+
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
722+
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
723+
724+
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
725+
serverName, serverName, nameResolverArgs, scheduler);
726+
727+
verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
728+
729+
@SuppressWarnings("unchecked")
730+
XdsClient.ResourceWatcher<XdsListenerResource.LdsUpdate> serverNameWatcher =
731+
mock(XdsClient.ResourceWatcher.class);
732+
xdsClient.watchXdsResource(
733+
XdsListenerResource.getInstance(),
734+
serverName + "2",
735+
serverNameWatcher,
736+
MoreExecutors.directExecutor());
737+
738+
syncContext.execute(() -> {
739+
// Shutdown before any updates. This will unsubscribe from XdsClient, but only after this
740+
// Runnable returns
741+
xdsDependencyManager.shutdown();
742+
743+
// Cause an onChanged() for each type, and maybe onResourceDoesNotExist(), going from EDS up
744+
// the tree since updates won't be processed immediately by the dependency manager (we're
745+
// blocking the sync context)
746+
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
747+
ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT);
748+
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS2",
749+
ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT);
750+
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS2", "EDS2",
751+
ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT);
752+
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS2", "CDS2", "EDS2",
753+
ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT);
754+
XdsTestUtils.setAdsConfig(controlPlaneService, serverName + "2", "RDS2", "CDS2", "EDS2",
755+
ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT);
756+
});
757+
758+
// Wait for the prior updates to be processed by XdsClient. This can't be done in the
759+
// syncContext as flow control prevents further updates until previous callbacks have completed.
760+
verify(serverNameWatcher, timeout(5000)).onChanged(any());
761+
xdsClient.cancelXdsResourceWatch(
762+
XdsListenerResource.getInstance(), serverName + "2", serverNameWatcher);
763+
}
764+
718765
private Listener buildInlineClientListener(String rdsName, String clusterName) {
719766
return XdsTestUtils.buildInlineClientListener(rdsName, clusterName, serverName);
720767
}

0 commit comments

Comments
 (0)