Skip to content

Commit 01e667e

Browse files
fix(sse): honor disconnect/dispose and cap error-path reconnects
SSE auto-reconnect ignored disconnectSSE and screen dispose because _manuallyDisconnected was never checked before scheduling reconnects, and dispose() cleared that set before delayed callbacks could run. The error handler also passed reconnectAttempts by value, so the outer counter never advanced and maxReconnectAttempts was bypassed on errors. Guard reconnect with _shouldReconnect (dispose + manual disconnect) and share attempt state via a list so error-driven retries respect the cap. Co-authored-by: Sharjeel Yunus <sharjeelyunus@users.noreply.github.com>
1 parent 1718c91 commit 01e667e

2 files changed

Lines changed: 85 additions & 22 deletions

File tree

modules/ensemble/lib/framework/apiproviders/sse_api_provider.dart

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class SSEAPIProvider extends APIProvider with LiveAPIProvider {
2727
final Map<String, StreamSubscription> _subscriptions = {};
2828
final Map<String, http.Client> _activeClients = {};
2929
final Set<String> _manuallyDisconnected = {};
30+
bool _disposed = false;
3031

3132
@override
3233
Future<void> init(String appId, Map<String, dynamic> config) async {
@@ -206,7 +207,7 @@ class SSEAPIProvider extends APIProvider with LiveAPIProvider {
206207
ResponseListener listener,
207208
SSEOptions options,
208209
DataContext eContext) async {
209-
int reconnectAttempts = 0;
210+
final List<int> reconnectAttempts = [0];
210211
String? lastEventId;
211212

212213
Future<void> connect() async {
@@ -236,7 +237,7 @@ class SSEAPIProvider extends APIProvider with LiveAPIProvider {
236237
}
237238

238239
// Reset reconnect attempts on successful connection
239-
reconnectAttempts = 0;
240+
reconnectAttempts[0] = 0;
240241

241242
// Parse SSE stream
242243
String? currentEventType;
@@ -296,18 +297,20 @@ class SSEAPIProvider extends APIProvider with LiveAPIProvider {
296297
onError: (error) {
297298
log('SSE stream error: $error');
298299
_handleSSEError(error, apiName, listener, options,
299-
reconnectAttempts, () => connect(), url, headers, eContext);
300+
reconnectAttempts, () => connect());
300301
},
301302
onDone: () {
302303
log('SSE stream closed');
303304
// Attempt reconnection if enabled
304-
if (options.autoReconnect &&
305-
reconnectAttempts < options.maxReconnectAttempts) {
306-
reconnectAttempts++;
305+
if (_shouldReconnect(apiName, options, reconnectAttempts[0])) {
306+
reconnectAttempts[0]++;
307307
final delay = Duration(
308-
milliseconds: options.reconnectDelay * reconnectAttempts);
308+
milliseconds: options.reconnectDelay * reconnectAttempts[0]);
309309
Future.delayed(delay, () {
310-
log('Reconnecting SSE (attempt $reconnectAttempts)...');
310+
if (!_shouldReconnect(apiName, options, reconnectAttempts[0])) {
311+
return;
312+
}
313+
log('Reconnecting SSE (attempt ${reconnectAttempts[0]})...');
311314
connect();
312315
});
313316
} else {
@@ -328,7 +331,7 @@ class SSEAPIProvider extends APIProvider with LiveAPIProvider {
328331
_subscriptions[apiName] = subscription;
329332
} catch (error) {
330333
_handleSSEError(error, apiName, listener, options, reconnectAttempts,
331-
() => connect(), url, headers, eContext);
334+
() => connect());
332335
}
333336
}
334337

@@ -374,16 +377,20 @@ class SSEAPIProvider extends APIProvider with LiveAPIProvider {
374377
}
375378
}
376379

380+
bool _shouldReconnect(
381+
String apiName, SSEOptions options, int reconnectAttempts) =>
382+
!_disposed &&
383+
!_manuallyDisconnected.contains(apiName) &&
384+
options.autoReconnect &&
385+
reconnectAttempts < options.maxReconnectAttempts;
386+
377387
void _handleSSEError(
378388
dynamic error,
379389
String apiName,
380390
ResponseListener listener,
381391
SSEOptions options,
382-
int reconnectAttempts,
383-
VoidCallback reconnect,
384-
String url,
385-
Map<String, String> headers,
386-
DataContext eContext) {
392+
List<int> reconnectAttempts,
393+
VoidCallback reconnect) {
387394
String errorMessage;
388395
if (error is HandshakeException || error is TlsException) {
389396
errorMessage =
@@ -408,13 +415,15 @@ class SSEAPIProvider extends APIProvider with LiveAPIProvider {
408415
listener(errorResponse);
409416

410417
// Attempt reconnection if enabled
411-
if (options.autoReconnect &&
412-
reconnectAttempts < options.maxReconnectAttempts) {
413-
reconnectAttempts++;
414-
final delay =
415-
Duration(milliseconds: options.reconnectDelay * reconnectAttempts);
418+
if (_shouldReconnect(apiName, options, reconnectAttempts[0])) {
419+
reconnectAttempts[0]++;
420+
final delay = Duration(
421+
milliseconds: options.reconnectDelay * reconnectAttempts[0]);
416422
Future.delayed(delay, () {
417-
log('Reconnecting SSE after error (attempt $reconnectAttempts)...');
423+
if (!_shouldReconnect(apiName, options, reconnectAttempts[0])) {
424+
return;
425+
}
426+
log('Reconnecting SSE after error (attempt ${reconnectAttempts[0]})...');
418427
reconnect();
419428
});
420429
}
@@ -530,6 +539,7 @@ class SSEAPIProvider extends APIProvider with LiveAPIProvider {
530539

531540
@override
532541
dispose() {
542+
_disposed = true;
533543
for (final apiName in _subscriptions.keys.toList()) {
534544
_manuallyDisconnected.add(apiName);
535545
}
@@ -543,10 +553,13 @@ class SSEAPIProvider extends APIProvider with LiveAPIProvider {
543553
client.close();
544554
}
545555
_activeClients.clear();
546-
547-
_manuallyDisconnected.clear();
548556
}
549557

558+
@visibleForTesting
559+
bool shouldReconnectForTesting(
560+
String apiName, SSEOptions options, int reconnectAttempts) =>
561+
_shouldReconnect(apiName, options, reconnectAttempts);
562+
550563
@visibleForTesting
551564
int get subscriptionCountForTesting => _subscriptions.length;
552565

modules/ensemble/test/sse_provider_dispose_test.dart

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,54 @@ void main() {
4949
expect(identical(providers.getProvider('sse'), sse), isTrue);
5050
});
5151
});
52+
53+
group('SSEAPIProvider reconnect guards', () {
54+
test('disconnect prevents auto-reconnect', () async {
55+
final provider = SSEAPIProvider();
56+
await provider.disconnect('liveFeed');
57+
58+
expect(
59+
provider.shouldReconnectForTesting('liveFeed', SSEOptions(), 0),
60+
isFalse,
61+
);
62+
});
63+
64+
test('dispose prevents auto-reconnect', () {
65+
final provider = SSEAPIProvider();
66+
provider.dispose();
67+
68+
expect(
69+
provider.shouldReconnectForTesting('liveFeed', SSEOptions(), 0),
70+
isFalse,
71+
);
72+
});
73+
74+
test('honors maxReconnectAttempts', () {
75+
final provider = SSEAPIProvider();
76+
final options = SSEOptions(maxReconnectAttempts: 3);
77+
78+
expect(provider.shouldReconnectForTesting('api', options, 0), isTrue);
79+
expect(provider.shouldReconnectForTesting('api', options, 2), isTrue);
80+
expect(provider.shouldReconnectForTesting('api', options, 3), isFalse);
81+
});
82+
83+
test('shared reconnect counter stops after max error retries', () {
84+
final provider = SSEAPIProvider();
85+
final options = SSEOptions(maxReconnectAttempts: 3);
86+
final attempts = <int>[0];
87+
88+
for (var i = 0; i < 3; i++) {
89+
expect(
90+
provider.shouldReconnectForTesting('api', options, attempts[0]),
91+
isTrue,
92+
);
93+
attempts[0]++;
94+
}
95+
96+
expect(
97+
provider.shouldReconnectForTesting('api', options, attempts[0]),
98+
isFalse,
99+
);
100+
});
101+
});
52102
}

0 commit comments

Comments
 (0)