@@ -32,6 +32,19 @@ class DTDManager {
3232 Uri ? get uri => _uri;
3333 Uri ? _uri;
3434
35+ /// A stream of [CoreDtdServiceConstants.serviceRegisteredKind] and
36+ /// [CoreDtdServiceConstants.serviceUnregisteredKind] events.
37+ ///
38+ /// Since this is a broadcast stream, it supports multiple subscribers.
39+ Stream <DTDEvent > get serviceRegistrationBroadcastStream =>
40+ _serviceRegistrationController.stream;
41+ final _serviceRegistrationController = StreamController <DTDEvent >.broadcast ();
42+
43+ /// The subscription to the current service registration stream.
44+ ///
45+ /// This is canceled and reset with the DTD connection changes.
46+ StreamSubscription <DTDEvent >? _currentServiceRegistrationSubscription;
47+
3548 /// Whether or not to automatically reconnect if disconnected.
3649 ///
3750 /// This will happen by default as long as the disconnect wasn't
@@ -74,7 +87,7 @@ class DTDManager {
7487 }
7588
7689 /// Triggers a reconnect to the last connected URI if the current state is
77- /// [ConnectionFailedDTDState] (and there was a pervious connection).
90+ /// [ConnectionFailedDTDState] (and there was a previous connection).
7891 Future <void > reconnect () {
7992 final reconnectFunc = _lastConnectFunc;
8093 if (_connectionState.value is ! ConnectionFailedDTDState ||
@@ -149,11 +162,19 @@ class DTDManager {
149162
150163 try {
151164 final connection = await _connectWithRetries (uri, maxRetries: maxRetries);
165+ await _listenForServiceRegistrationEvents (connection);
152166
167+ // Save the previous connection so that we can close it after the new
168+ // connection is reestablished.
169+ final previousConnection = _connection.value;
153170 _uri = uri;
154171 // Set this after setting the value of [_uri] so that [_uri] can be used
155172 // by any listeners of the [_connection] notifier.
156173 _connection.value = connection;
174+ // Close the previous connection.
175+ if (previousConnection != null ) {
176+ await previousConnection.close ();
177+ }
157178 _connectionState.value = ConnectedDTDState ();
158179 _log.info ('Successfully connected to DTD at: $uri ' );
159180
@@ -230,16 +251,17 @@ class DTDManager {
230251 // an explicit disconnect.
231252 _automaticallyReconnect = false ;
232253
233- // We only clear the connection if we are explicitly disconnecting. In the
234- // case where the connection just dropped, we leave it so that we can
235- // continue to render a page (usually with an overlay).
254+ // We only close and clear the connection if we are explicitly
255+ // disconnecting. In the case where the connection just dropped, we leave
256+ // it so that we can continue to render a page (usually with an overlay).
257+ // We only close it once the new connection is established.
258+ if (_connection.value case final connection? ) {
259+ await connection.close ();
260+ }
236261 _connection.value = null ;
237262 }
238263
239264 _periodicConnectionCheck? .cancel ();
240- if (_connection.value case final connection? ) {
241- await connection.close ();
242- }
243265
244266 _connectionState.value = NotConnectedDTDState ();
245267 _uri = null ;
@@ -249,9 +271,47 @@ class DTDManager {
249271
250272 Future <void > dispose () async {
251273 await disconnect ();
274+ await _currentServiceRegistrationSubscription? .cancel ();
275+ await _serviceRegistrationController.close ();
252276 _connection.dispose ();
253277 }
254278
279+ /// Listens for service registration events on the [dtd] connection.
280+ Future <void > _listenForServiceRegistrationEvents (
281+ DartToolingDaemon dtd) async {
282+ // Note: We immediately begin listening for service registration events on
283+ // on the new DTD connection before canceling the previous subscription.
284+ // This guarantees that we don't miss any events across reconnects.
285+ // ignore: cancel_subscriptions, false positive, it is canceled below.
286+ final nextServiceRegistrationSubscription = dtd
287+ .onEvent (CoreDtdServiceConstants .servicesStreamId)
288+ .listen (_forwardServiceRegistrationEvents,
289+ onError: _logServiceStreamError);
290+ await dtd.streamListen (CoreDtdServiceConstants .servicesStreamId);
291+
292+ // Cancel the previous subscription.
293+ await _currentServiceRegistrationSubscription? .cancel ();
294+ _currentServiceRegistrationSubscription =
295+ nextServiceRegistrationSubscription;
296+ }
297+
298+ /// Forwards service registration events to the
299+ /// [_serviceRegistrationController] .
300+ void _forwardServiceRegistrationEvents (DTDEvent event) {
301+ final kind = event.kind;
302+ final isRegistrationEvent =
303+ kind == CoreDtdServiceConstants .serviceRegisteredKind ||
304+ kind == CoreDtdServiceConstants .serviceUnregisteredKind;
305+
306+ if (isRegistrationEvent) {
307+ _serviceRegistrationController.add (event);
308+ }
309+ }
310+
311+ void _logServiceStreamError (Object error) {
312+ _log.warning ('Error in DTD service stream' , error);
313+ }
314+
255315 /// Returns the workspace roots for the Dart Tooling Daemon connection.
256316 ///
257317 /// These roots are set by the tool that started DTD, which may be the IDE,
0 commit comments