Skip to content

Commit 50743eb

Browse files
committed
fix: add max retry
1 parent 89a7036 commit 50743eb

1 file changed

Lines changed: 113 additions & 126 deletions

File tree

lib/flutter_client_sse.dart

Lines changed: 113 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -2,157 +2,130 @@ library flutter_client_sse;
22

33
import 'dart:async';
44
import 'dart:convert';
5+
56
import 'package:flutter_client_sse/constants/sse_request_type_enum.dart';
67
import 'package:http/http.dart' as http;
8+
79
part 'sse_event_model.dart';
810

9-
/// A client for subscribing to Server-Sent Events (SSE).
1011
class SSEClient {
11-
static http.Client _client = new http.Client();
12-
13-
/// Retry the SSE connection after a delay.
14-
///
15-
/// [method] is the request method (GET or POST).
16-
/// [url] is the URL of the SSE endpoint.
17-
/// [header] is a map of request headers.
18-
/// [body] is an optional request body for POST requests.
19-
/// [streamController] is required to persist the stream from the old connection
20-
static void _retryConnection(
21-
{required SSERequestType method,
22-
required String url,
23-
required Map<String, String> header,
24-
required StreamController<SSEModel> streamController,
25-
Map<String, dynamic>? body}) {
26-
print('---RETRY CONNECTION---');
12+
static http.Client _client = http.Client();
13+
14+
static void _retryConnection({
15+
required SSERequestType method,
16+
required String url,
17+
required Map<String, String> header,
18+
required StreamController<SSEModel> streamController,
19+
Map<String, dynamic>? body,
20+
int? retriesLeft,
21+
required int maxRetries,
22+
}) {
23+
if (retriesLeft == null || retriesLeft <= 0) {
24+
print('---MAX RETRY LIMIT REACHED---');
25+
return;
26+
}
27+
28+
print('---RETRY CONNECTION ($retriesLeft retries left)---');
2729
Future.delayed(Duration(seconds: 5), () {
2830
subscribeToSSE(
2931
method: method,
3032
url: url,
3133
header: header,
3234
body: body,
3335
oldStreamController: streamController,
36+
maxRetries: maxRetries,
37+
retriesLeft: retriesLeft - 1,
3438
);
3539
});
3640
}
3741

38-
/// Subscribe to Server-Sent Events.
39-
///
40-
/// [method] is the request method (GET or POST).
41-
/// [url] is the URL of the SSE endpoint.
42-
/// [header] is a map of request headers.
43-
/// [body] is an optional request body for POST requests.
44-
///
45-
/// Returns a [Stream] of [SSEModel] representing the SSE events.
46-
static Stream<SSEModel> subscribeToSSE(
47-
{required SSERequestType method,
48-
required String url,
49-
required Map<String, String> header,
50-
StreamController<SSEModel>? oldStreamController,
51-
Map<String, dynamic>? body}) {
52-
StreamController<SSEModel> streamController = StreamController();
53-
if (oldStreamController != null) {
54-
streamController = oldStreamController;
55-
}
42+
static Stream<SSEModel> subscribeToSSE({
43+
required SSERequestType method,
44+
required String url,
45+
required Map<String, String> header,
46+
StreamController<SSEModel>? oldStreamController,
47+
Map<String, dynamic>? body,
48+
int maxRetries = 5,
49+
int? retriesLeft,
50+
}) {
51+
StreamController<SSEModel> streamController = oldStreamController ?? StreamController();
52+
retriesLeft ??= maxRetries;
53+
5654
var lineRegex = RegExp(r'^([^:]*)(?::)?(?: )?(.*)?$');
5755
var currentSSEModel = SSEModel(data: '', id: '', event: '');
5856
print("--SUBSCRIBING TO SSE---");
59-
while (true) {
60-
try {
61-
_client = http.Client();
62-
var request = new http.Request(
63-
method == SSERequestType.GET ? "GET" : "POST",
64-
Uri.parse(url),
65-
);
6657

67-
/// Adding headers to the request
68-
header.forEach((key, value) {
69-
request.headers[key] = value;
70-
});
71-
72-
/// Adding body to the request if exists
73-
if (body != null) {
74-
request.body = jsonEncode(body);
75-
}
76-
77-
Future<http.StreamedResponse> response = _client.send(request);
78-
79-
/// Listening to the response as a stream
80-
response.asStream().listen((data) {
81-
/// Applying transforms and listening to it
82-
data.stream
83-
..transform(Utf8Decoder()).transform(LineSplitter()).listen(
84-
(dataLine) {
85-
if (dataLine.isEmpty) {
86-
/// This means that the complete event set has been read.
87-
/// We then add the event to the stream
88-
streamController.add(currentSSEModel);
89-
currentSSEModel = SSEModel(data: '', id: '', event: '');
90-
return;
91-
}
92-
93-
/// Get the match of each line through the regex
94-
Match match = lineRegex.firstMatch(dataLine)!;
95-
var field = match.group(1);
96-
if (field!.isEmpty) {
97-
return;
98-
}
99-
var value = '';
100-
if (field == 'data') {
101-
// If the field is data, we get the data through the substring
102-
value = dataLine.substring(
103-
5,
104-
);
105-
} else {
106-
value = match.group(2) ?? '';
107-
}
108-
switch (field) {
109-
case 'event':
110-
currentSSEModel.event = value;
111-
break;
112-
case 'data':
113-
currentSSEModel.data =
114-
(currentSSEModel.data ?? '') + value + '\n';
115-
break;
116-
case 'id':
117-
currentSSEModel.id = value;
118-
break;
119-
case 'retry':
120-
break;
121-
default:
122-
print('---ERROR---');
123-
print(dataLine);
124-
_retryConnection(
125-
method: method,
126-
url: url,
127-
header: header,
128-
streamController: streamController,
129-
);
130-
}
131-
},
132-
onError: (e, s) {
58+
try {
59+
_client = http.Client();
60+
var request = http.Request(
61+
method == SSERequestType.GET ? "GET" : "POST",
62+
Uri.parse(url),
63+
);
64+
65+
header.forEach((key, value) {
66+
request.headers[key] = value;
67+
});
68+
69+
if (body != null) {
70+
request.body = jsonEncode(body);
71+
}
72+
73+
Future<http.StreamedResponse> response = _client.send(request);
74+
75+
response.asStream().listen((data) {
76+
data.stream.transform(Utf8Decoder()).transform(LineSplitter()).listen(
77+
(dataLine) {
78+
if (dataLine.isEmpty) {
79+
streamController.add(currentSSEModel);
80+
currentSSEModel = SSEModel(data: '', id: '', event: '');
81+
return;
82+
}
83+
84+
Match? match = lineRegex.firstMatch(dataLine);
85+
if (match == null) return;
86+
var field = match.group(1);
87+
if (field == null || field.isEmpty) return;
88+
var value = match.group(2) ?? '';
89+
90+
switch (field) {
91+
case 'event':
92+
currentSSEModel.event = value;
93+
break;
94+
case 'data':
95+
currentSSEModel.data = (currentSSEModel.data ?? '') + value + '\n';
96+
break;
97+
case 'id':
98+
currentSSEModel.id = value;
99+
break;
100+
default:
133101
print('---ERROR---');
134-
print(e);
102+
print(dataLine);
135103
_retryConnection(
136104
method: method,
137105
url: url,
138106
header: header,
139-
body: body,
140107
streamController: streamController,
108+
body: body,
109+
retriesLeft: retriesLeft,
110+
maxRetries: maxRetries,
141111
);
142-
},
112+
}
113+
},
114+
onError: (e, s) {
115+
print('---ERROR---');
116+
print(e);
117+
_retryConnection(
118+
method: method,
119+
url: url,
120+
header: header,
121+
body: body,
122+
streamController: streamController,
123+
retriesLeft: retriesLeft,
124+
maxRetries: maxRetries,
143125
);
144-
}, onError: (e, s) {
145-
print('---ERROR---');
146-
print(e);
147-
_retryConnection(
148-
method: method,
149-
url: url,
150-
header: header,
151-
body: body,
152-
streamController: streamController,
153-
);
154-
});
155-
} catch (e) {
126+
},
127+
);
128+
}, onError: (e, s) {
156129
print('---ERROR---');
157130
print(e);
158131
_retryConnection(
@@ -161,13 +134,27 @@ class SSEClient {
161134
header: header,
162135
body: body,
163136
streamController: streamController,
137+
retriesLeft: retriesLeft,
138+
maxRetries: maxRetries,
164139
);
165-
}
166-
return streamController.stream;
140+
});
141+
} catch (e) {
142+
print('---ERROR---');
143+
print(e);
144+
_retryConnection(
145+
method: method,
146+
url: url,
147+
header: header,
148+
body: body,
149+
streamController: streamController,
150+
retriesLeft: retriesLeft,
151+
maxRetries: maxRetries,
152+
);
167153
}
154+
155+
return streamController.stream;
168156
}
169157

170-
/// Unsubscribe from the SSE.
171158
static void unsubscribeFromSSE() {
172159
_client.close();
173160
}

0 commit comments

Comments
 (0)