Skip to content

Commit 53461b6

Browse files
authored
xds: skip DiscoveryRequest for unsubscribed types on stream ready (#12782)
## TL;DR When the bootstrap declares more than one xDS server (e.g. a default server for LDS/CDS plus an authority-specific EDS-only server), grpc-java was sending CDS/LDS DiscoveryRequests to the EDS-only server too. That server replies `UNIMPLEMENTED`, which tears down the stream and EDS data never arrives. Fix: skip DiscoveryRequests for resource types we don't actually subscribe to on a given server. ## Problem Under [A47 — xDS Federation](https://github.com/grpc/proposal/blob/master/A47-xds-federation.md), authorities can declare their own `xds_servers` block in the bootstrap. When an ADS stream is opened to an authority-specific server, `ControlPlaneClient.adjustResourceSubscription` sends a `DiscoveryRequest` for every **globally-subscribed** resource type — even types that have no subscription for *this* server. The empty request still carries a `type_url`, and an authority-specific server (e.g. an EDS-only control plane) may reject it with `UNIMPLEMENTED`, which tears down the entire stream before the legitimate request that follows can complete. ## Reproduce Bootstrap declares two servers — call them control-plane-A (handles LDS/CDS for authority A) and control-plane-B (handles EDS only for authority B). A grpc-java channel that resolves through LDS → CDS in authority A and EDS in authority B opens an ADS stream to control-plane-B. When that stream becomes ready, [`sendDiscoveryRequests`](https://github.com/grpc/grpc-java/blob/master/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java) iterates `resourceStore.getSubscribedResourceTypesWithTypeUrl()` — which returns **all three** types (Listener, Cluster, Endpoint) — and calls `adjustResourceSubscription` for each. For Listener and Cluster, `getSubscribedResources(serverInfo, type)` returns null/empty, but the request is still sent on the wire: ``` io.grpc.StatusRuntimeException: UNAVAILABLE: Error retrieving EDS resource ...: UNIMPLEMENTED. Details: Watches for type type.googleapis.com/envoy.config.cluster.v3.Cluster are not supported in this service ``` The Cluster (CDS) request reaches control-plane-B, gets rejected, and the stream goes into backoff with no EDS data ever delivered. grpc-go on the same bootstrap works fine against the same server, which pointed at the asymmetry. ## Fix In `adjustResourceSubscription`, return early when both: 1. `resources` is `null` (the store reports no subscription for this type on this server), and 2. No DiscoveryRequest of this type has been sent on the current stream (`!adsStream.sentTypes.contains(resourceType)`). Per the `ResourceStore` contract in `XdsClient.java`, a `null` return means "no subscription", while an empty collection means a **wildcard** subscription — a real subscription that must still emit the empty `resource_names` request and start its missing-resource timers. The "DiscoveryRequest of this type has been sent on the current stream" discriminator is tracked in a per-stream `sentTypes` set on `AdsStream`, populated wherever a request is actually transmitted (initial sends, ACKs, NACKs). Per-stream is the right scope because the server's view of our subscriptions is per-stream — on stream replacement the set is cleared implicitly along with the `AdsStream` instance. Gating on `!versions.containsKey(type)` instead would be incorrect because `versions` is only populated on ACK. If a watch is canceled after the initial DiscoveryRequest goes out but before any response is ACKed, that guard would suppress the empty unsubscribe — leaving the server with a stale subscription until the stream resets. Tracking actual sends per-stream closes that window. The unsubscribe-all path (had-version, now-empty/null) is preserved: we still send the empty request and clear the version, telling the server to drop our subscription for that type. ## Mirrors grpc-go This brings grpc-java in line with grpc-go's behavior. The Go equivalent is [`adsStreamImpl.sendExisting`](https://github.com/grpc/grpc-go/blob/v1.80.0/internal/xds/clients/xdsclient/ads_stream.go#L335-L368): ```go for typ, state := range s.resourceTypeState { state.nonce = "" if len(state.subscribedResources) == 0 { continue // <-- explicit skip } names := resourceNames(state.subscribedResources) if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil { return err } s.startWatchTimersLocked(typ, names) } ``` Two structural reasons grpc-go avoids this bug today: 1. The iteration domain is **per-stream** ([`s.resourceTypeState`](https://github.com/grpc/grpc-go/blob/v1.80.0/internal/xds/clients/xdsclient/ads_stream.go#L143)), populated only when [`subscribe`](https://github.com/grpc/grpc-go/blob/v1.80.0/internal/xds/clients/xdsclient/ads_stream.go#L167-L193) is called for a resource on this stream — so a Cluster type never even appears in the iteration of an EDS-only stream. 2. Even within that per-stream iteration, the explicit `if len(state.subscribedResources) == 0 { continue }` covers the case where the type has no subscription on this stream. The grpc-java fix is the equivalent of (2). The `!adsStream.sentTypes.contains` guard is needed because Java's iteration domain is global (`getSubscribedResourceTypesWithTypeUrl` is xds-client-wide), so we may see types we never subscribed to on this stream. Note that grpc-go physically separates two paths: [`sendNewLocked`](https://github.com/grpc/grpc-go/blob/v1.80.0/internal/xds/clients/xdsclient/ads_stream.go#L308-L318) handles runtime sub/unsub and sends every queued request unconditionally (so an empty unsubscribe always goes on the wire, ACK or no ACK), while `sendExisting` handles stream re-establishment and applies the `len == 0` skip. grpc-java has a single `adjustResourceSubscription` function that serves both paths — the per-stream `sentTypes` set is what lets the same guard distinguish them: on a fresh stream `sentTypes` is empty so the guard reduces to "no subscription → skip" (mirroring `sendExisting`), while at runtime after the initial request `sentTypes.contains(type)` is true so the guard does not trigger and the empty unsubscribe is sent (mirroring `sendNewLocked`). ## Test plan Unit tests in `ControlPlaneClientTest`: - `streamReady_skipsEmptyDiscoveryRequestForUnsubscribedType` — the federation case, asserts CDS request is suppressed and EDS still goes through - `streamReady_sendsRequestForAllTypesWhenAllHaveResources` — guards against over-eager skip - `streamReady_skipsTypeWithNoSubscription` — `null` return skips - `streamReady_sendsWildcardRequestAndStartsTimers` — empty collection (wildcard) still sends and starts timers - `cancelBeforeAck_sendsEmptyUnsubscribe` — cancel-before-ACK timing window still emits the unsubscribe ## Spec note xDS SoTW spec ([Envoy xDS protocol](https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol)) treats an empty `resource_names` for LDS/CDS as a wildcard subscription ("send me everything of this type"). The previous grpc-java behavior would unintentionally trigger wildcard CDS subscriptions on every authority-specific stream — which an EDS-only server is right to refuse. Skipping when no subscription exists side-steps that misinterpretation; legitimate wildcard subscriptions (empty collection from a real subscription) still go on the wire as intended, and the existing unsubscribe-all path (with a prior version) continues to work.
1 parent 0ddad69 commit 53461b6

2 files changed

Lines changed: 310 additions & 0 deletions

File tree

xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,31 @@ void adjustResourceSubscription(XdsResourceType<?> resourceType) {
160160
}
161161

162162
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
163+
if (resources == null && !adsStream.sentTypes.contains(resourceType)) {
164+
// No subscription for this type on this server, and we have never sent a DiscoveryRequest
165+
// of this type on the current stream — the server has no subscription state to clear.
166+
//
167+
// Per the ResourceStore contract in XdsClient.java, a null return means "no subscription";
168+
// an empty collection means wildcard subscription, which is a real subscription and must
169+
// not be skipped here.
170+
//
171+
// We track sent types per-stream rather than gating on `versions` because `versions` is
172+
// only populated on ACK. If a watch is canceled after the initial DiscoveryRequest goes
173+
// out but before any response is ACKed, `versions` would still have no entry for the
174+
// type, and gating on it would suppress the empty unsubscribe — leaving the server with
175+
// a stale subscription until the stream resets.
176+
//
177+
// Without this skip, sendDiscoveryRequests() iterates over every globally-subscribed
178+
// resource type when a stream becomes ready and emits an empty DiscoveryRequest for types
179+
// that have no subscription on this server. Per A47 (xDS Federation) servers may be
180+
// authority-specific (e.g. an EDS-only control plane) and reject DiscoveryRequests for
181+
// types they do not handle, tearing down the stream.
182+
//
183+
// Mirrors grpc-go's behavior in
184+
// internal/xds/clients/xdsclient/ads_stream.go:sendExisting, which skips types with no
185+
// subscription.
186+
return;
187+
}
163188
if (resources == null) {
164189
resources = Collections.emptyList();
165190
}
@@ -319,6 +344,11 @@ private class AdsStream implements XdsTransportFactory.EventHandler<DiscoveryRes
319344
// management server should not send a DiscoveryResponse for any DiscoveryRequest that has a
320345
// stale nonce."
321346
private final Map<String, String> respNonces = new HashMap<>();
347+
// Resource types for which a DiscoveryRequest has been sent on this stream. Used by
348+
// adjustResourceSubscription() to decide whether an empty unsubscribe must be sent on the
349+
// wire: the server only has subscription state to clear for types we have actually sent a
350+
// request for on this stream. Cleared implicitly when the stream is replaced.
351+
private final Set<XdsResourceType<?>> sentTypes = new HashSet<>();
322352
private final StreamingCall<DiscoveryRequest, DiscoveryResponse> call;
323353
private final MethodDescriptor<DiscoveryRequest, DiscoveryResponse> methodDescriptor =
324354
AggregatedDiscoveryServiceGrpc.getStreamAggregatedResourcesMethod();
@@ -358,6 +388,7 @@ void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
358388
}
359389
DiscoveryRequest request = builder.build();
360390
call.sendMessage(request);
391+
sentTypes.add(type);
361392
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
362393
logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", messagePrinter.print(request));
363394
}
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
/*
2+
* Copyright 2026 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.xds.client;
18+
19+
import static com.google.common.truth.Truth.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.anyString;
22+
import static org.mockito.ArgumentMatchers.argThat;
23+
import static org.mockito.Mockito.atLeastOnce;
24+
import static org.mockito.Mockito.never;
25+
import static org.mockito.Mockito.times;
26+
import static org.mockito.Mockito.verify;
27+
import static org.mockito.Mockito.when;
28+
29+
import com.google.common.base.Stopwatch;
30+
import com.google.common.collect.ImmutableList;
31+
import com.google.common.collect.ImmutableMap;
32+
import com.google.common.collect.ImmutableSet;
33+
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
34+
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
35+
import io.grpc.InsecureChannelCredentials;
36+
import io.grpc.MethodDescriptor;
37+
import io.grpc.SynchronizationContext;
38+
import io.grpc.internal.BackoffPolicy;
39+
import io.grpc.internal.FakeClock;
40+
import io.grpc.xds.client.Bootstrapper.ServerInfo;
41+
import io.grpc.xds.client.EnvoyProtoData.Node;
42+
import io.grpc.xds.client.XdsClient.ResourceStore;
43+
import io.grpc.xds.client.XdsClient.XdsResponseHandler;
44+
import io.grpc.xds.client.XdsTransportFactory.EventHandler;
45+
import io.grpc.xds.client.XdsTransportFactory.StreamingCall;
46+
import io.grpc.xds.client.XdsTransportFactory.XdsTransport;
47+
import java.util.Collections;
48+
import java.util.Map;
49+
import org.junit.Before;
50+
import org.junit.Rule;
51+
import org.junit.Test;
52+
import org.junit.runner.RunWith;
53+
import org.junit.runners.JUnit4;
54+
import org.mockito.ArgumentCaptor;
55+
import org.mockito.Mock;
56+
import org.mockito.junit.MockitoJUnit;
57+
import org.mockito.junit.MockitoRule;
58+
59+
/** Unit tests for {@link ControlPlaneClient}. */
60+
@RunWith(JUnit4.class)
61+
public class ControlPlaneClientTest {
62+
63+
private static final String CDS_TYPE_URL = "type.googleapis.com/envoy.config.cluster.v3.Cluster";
64+
private static final String EDS_TYPE_URL =
65+
"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
66+
67+
private final SynchronizationContext syncContext =
68+
new SynchronizationContext((t, e) -> {
69+
throw new AssertionError("Uncaught exception in sync context", e);
70+
});
71+
private final FakeClock fakeClock = new FakeClock();
72+
private final ServerInfo serverInfo =
73+
ServerInfo.create("eds-control-plane:8443", InsecureChannelCredentials.create());
74+
private final Node bootstrapNode = Node.newBuilder().setId("test-node").build();
75+
76+
@Mock private XdsTransport xdsTransport;
77+
@Mock private StreamingCall<DiscoveryRequest, DiscoveryResponse> streamingCall;
78+
@Mock private XdsResponseHandler responseHandler;
79+
@Mock private ResourceStore resourceStore;
80+
@Mock private BackoffPolicy.Provider backoffPolicyProvider;
81+
@Mock private MessagePrettyPrinter messagePrinter;
82+
@Mock private XdsResourceType<?> cdsType;
83+
@Mock private XdsResourceType<?> edsType;
84+
85+
@Rule public final MockitoRule mocks = MockitoJUnit.rule();
86+
87+
private ControlPlaneClient cpc;
88+
private ArgumentCaptor<EventHandler<DiscoveryResponse>> handlerCaptor;
89+
90+
@Before
91+
@SuppressWarnings("unchecked")
92+
public void setUp() {
93+
when(cdsType.typeUrl()).thenReturn(CDS_TYPE_URL);
94+
when(cdsType.typeName()).thenReturn("CDS");
95+
when(edsType.typeUrl()).thenReturn(EDS_TYPE_URL);
96+
when(edsType.typeName()).thenReturn("EDS");
97+
98+
when(xdsTransport.<DiscoveryRequest, DiscoveryResponse>createStreamingCall(
99+
anyString(),
100+
any(MethodDescriptor.Marshaller.class),
101+
any(MethodDescriptor.Marshaller.class)))
102+
.thenReturn(streamingCall);
103+
when(streamingCall.isReady()).thenReturn(true);
104+
105+
handlerCaptor = ArgumentCaptor.forClass(EventHandler.class);
106+
107+
cpc = new ControlPlaneClient(
108+
xdsTransport,
109+
serverInfo,
110+
bootstrapNode,
111+
responseHandler,
112+
resourceStore,
113+
fakeClock.getScheduledExecutorService(),
114+
syncContext,
115+
backoffPolicyProvider,
116+
() -> Stopwatch.createUnstarted(fakeClock.getTicker()),
117+
messagePrinter);
118+
}
119+
120+
/**
121+
* Reproduces the bug where, when an ADS stream is opened to an authority-specific server (e.g.
122+
* an EDS-only control plane), {@code sendDiscoveryRequests} previously emitted an empty
123+
* DiscoveryRequest for every globally-subscribed resource type — including types this server
124+
* does not handle. Authority-specific servers may reject those requests with UNIMPLEMENTED and
125+
* tear down the stream, blocking the legitimate request that follows.
126+
*
127+
* <p>Asserts that the empty CDS request is suppressed and only the EDS request (which has
128+
* resources for this server) goes on the wire.
129+
*/
130+
@Test
131+
public void streamReady_skipsEmptyDiscoveryRequestForUnsubscribedType() {
132+
// CDS is globally subscribed (e.g. against a different authority) but has no resources on
133+
// this server. EDS has one resource on this server.
134+
Map<String, XdsResourceType<?>> subscribedTypes =
135+
ImmutableMap.of(CDS_TYPE_URL, cdsType, EDS_TYPE_URL, edsType);
136+
when(resourceStore.getSubscribedResourceTypesWithTypeUrl()).thenReturn(subscribedTypes);
137+
when(resourceStore.getSubscribedResources(serverInfo, cdsType)).thenReturn(null);
138+
when(resourceStore.getSubscribedResources(serverInfo, edsType))
139+
.thenReturn(ImmutableList.of("foo-endpoint"));
140+
141+
// Triggers stream creation and registers the EventHandler.
142+
syncContext.execute(cpc::sendDiscoveryRequests);
143+
verify(streamingCall).start(handlerCaptor.capture());
144+
145+
// Drive the stream into the connected state. onReady() flips sentInitialRequest=true and
146+
// re-invokes sendDiscoveryRequests, which iterates the globally-subscribed types.
147+
handlerCaptor.getValue().onReady();
148+
149+
// EDS request was sent with the one resource for this server.
150+
ArgumentCaptor<DiscoveryRequest> sent = ArgumentCaptor.forClass(DiscoveryRequest.class);
151+
verify(streamingCall, atLeastOnce()).sendMessage(sent.capture());
152+
ImmutableSet<String> sentTypes = sent.getAllValues().stream()
153+
.map(DiscoveryRequest::getTypeUrl)
154+
.collect(ImmutableSet.toImmutableSet());
155+
assertThat(sentTypes).contains(EDS_TYPE_URL);
156+
assertThat(sentTypes).doesNotContain(CDS_TYPE_URL);
157+
158+
// Confirm the EDS request actually carried the resource name.
159+
DiscoveryRequest edsReq = sent.getAllValues().stream()
160+
.filter(r -> r.getTypeUrl().equals(EDS_TYPE_URL))
161+
.findFirst()
162+
.orElseThrow(() -> new AssertionError("EDS request not sent"));
163+
assertThat(edsReq.getResourceNamesList()).containsExactly("foo-endpoint");
164+
}
165+
166+
/**
167+
* If a server has resources for every globally-subscribed type, the empty-skip guard is a
168+
* no-op: a DiscoveryRequest is sent for every type. This guards against the skip becoming
169+
* over-eager and dropping legitimate subscriptions.
170+
*/
171+
@Test
172+
public void streamReady_sendsRequestForAllTypesWhenAllHaveResources() {
173+
Map<String, XdsResourceType<?>> subscribedTypes =
174+
ImmutableMap.of(CDS_TYPE_URL, cdsType, EDS_TYPE_URL, edsType);
175+
when(resourceStore.getSubscribedResourceTypesWithTypeUrl()).thenReturn(subscribedTypes);
176+
when(resourceStore.getSubscribedResources(serverInfo, cdsType))
177+
.thenReturn(ImmutableList.of("foo-cluster"));
178+
when(resourceStore.getSubscribedResources(serverInfo, edsType))
179+
.thenReturn(ImmutableList.of("foo-endpoint"));
180+
181+
syncContext.execute(cpc::sendDiscoveryRequests);
182+
verify(streamingCall).start(handlerCaptor.capture());
183+
handlerCaptor.getValue().onReady();
184+
185+
ArgumentCaptor<DiscoveryRequest> sent = ArgumentCaptor.forClass(DiscoveryRequest.class);
186+
verify(streamingCall, times(2)).sendMessage(sent.capture());
187+
ImmutableSet<String> sentTypes = sent.getAllValues().stream()
188+
.map(DiscoveryRequest::getTypeUrl)
189+
.collect(ImmutableSet.toImmutableSet());
190+
assertThat(sentTypes).containsExactly(CDS_TYPE_URL, EDS_TYPE_URL);
191+
}
192+
193+
/**
194+
* If only one type has a subscription on this server, no request is sent for the unsubscribed
195+
* type. This is the canonical multi-authority federation case (e.g. fabric authority owns CDS,
196+
* eds-control-plane owns EDS — the eds-control-plane stream should only see EDS).
197+
*/
198+
@Test
199+
public void streamReady_skipsTypeWithNoSubscription() {
200+
Map<String, XdsResourceType<?>> subscribedTypes =
201+
ImmutableMap.of(CDS_TYPE_URL, cdsType, EDS_TYPE_URL, edsType);
202+
when(resourceStore.getSubscribedResourceTypesWithTypeUrl()).thenReturn(subscribedTypes);
203+
when(resourceStore.getSubscribedResources(serverInfo, cdsType)).thenReturn(null);
204+
when(resourceStore.getSubscribedResources(serverInfo, edsType))
205+
.thenReturn(ImmutableList.of("foo-endpoint"));
206+
207+
syncContext.execute(cpc::sendDiscoveryRequests);
208+
verify(streamingCall).start(handlerCaptor.capture());
209+
handlerCaptor.getValue().onReady();
210+
211+
verify(streamingCall, never()).sendMessage(
212+
argThatTypeUrlIs(CDS_TYPE_URL));
213+
verify(streamingCall).sendMessage(argThatTypeUrlIs(EDS_TYPE_URL));
214+
}
215+
216+
/**
217+
* Per the ResourceStore contract in XdsClient.java, an empty collection from
218+
* getSubscribedResources indicates a wildcard subscription. The skip-on-empty guard must not
219+
* suppress wildcard requests on initial stream ready — the server needs the empty-resource-list
220+
* DiscoveryRequest to start streaming, and the watcher's missing-resource timers must start.
221+
*/
222+
@Test
223+
public void streamReady_sendsWildcardRequestAndStartsTimers() {
224+
Map<String, XdsResourceType<?>> subscribedTypes = ImmutableMap.of(CDS_TYPE_URL, cdsType);
225+
when(resourceStore.getSubscribedResourceTypesWithTypeUrl()).thenReturn(subscribedTypes);
226+
// Empty collection == wildcard subscription per the ResourceStore contract.
227+
when(resourceStore.getSubscribedResources(serverInfo, cdsType))
228+
.thenReturn(Collections.emptyList());
229+
230+
syncContext.execute(cpc::sendDiscoveryRequests);
231+
verify(streamingCall).start(handlerCaptor.capture());
232+
handlerCaptor.getValue().onReady();
233+
234+
ArgumentCaptor<DiscoveryRequest> sent = ArgumentCaptor.forClass(DiscoveryRequest.class);
235+
verify(streamingCall, atLeastOnce()).sendMessage(sent.capture());
236+
DiscoveryRequest cdsReq = sent.getAllValues().stream()
237+
.filter(r -> r.getTypeUrl().equals(CDS_TYPE_URL))
238+
.findFirst()
239+
.orElseThrow(() -> new AssertionError("CDS wildcard request not sent"));
240+
assertThat(cdsReq.getResourceNamesList()).isEmpty();
241+
242+
verify(resourceStore).startMissingResourceTimers(Collections.emptyList(), cdsType);
243+
}
244+
245+
/**
246+
* If a watch is canceled after the initial DiscoveryRequest goes out but before any response
247+
* is ACKed, the empty unsubscribe must still be sent — otherwise the server keeps the stale
248+
* subscription until the stream resets. The skip guard must gate on per-stream send history,
249+
* not on the {@code versions} map (which is only populated on ACK).
250+
*/
251+
@Test
252+
public void cancelBeforeAck_sendsEmptyUnsubscribe() {
253+
Map<String, XdsResourceType<?>> subscribedTypes = ImmutableMap.of(CDS_TYPE_URL, cdsType);
254+
when(resourceStore.getSubscribedResourceTypesWithTypeUrl()).thenReturn(subscribedTypes);
255+
when(resourceStore.getSubscribedResources(serverInfo, cdsType))
256+
.thenReturn(ImmutableList.of("foo-cluster"));
257+
258+
syncContext.execute(cpc::sendDiscoveryRequests);
259+
verify(streamingCall).start(handlerCaptor.capture());
260+
handlerCaptor.getValue().onReady();
261+
262+
// Initial DiscoveryRequest with the resource went out. No DiscoveryResponse has been ACKed.
263+
verify(streamingCall).sendMessage(argThatTypeUrlIs(CDS_TYPE_URL));
264+
265+
// Cancel the watch before any response arrives: store now reports no subscription.
266+
when(resourceStore.getSubscribedResources(serverInfo, cdsType)).thenReturn(null);
267+
syncContext.execute(() -> cpc.adjustResourceSubscription(cdsType));
268+
269+
ArgumentCaptor<DiscoveryRequest> sent = ArgumentCaptor.forClass(DiscoveryRequest.class);
270+
verify(streamingCall, times(2)).sendMessage(sent.capture());
271+
DiscoveryRequest unsub = sent.getAllValues().get(1);
272+
assertThat(unsub.getTypeUrl()).isEqualTo(CDS_TYPE_URL);
273+
assertThat(unsub.getResourceNamesList()).isEmpty();
274+
}
275+
276+
private static DiscoveryRequest argThatTypeUrlIs(String typeUrl) {
277+
return argThat(req -> req != null && typeUrl.equals(req.getTypeUrl()));
278+
}
279+
}

0 commit comments

Comments
 (0)