Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,8 @@ build/
!**/ios/**/default.mode2v3
!**/ios/**/default.pbxuser
!**/ios/**/default.perspectivev3

**/coverage/**

# FVM Version Cache
.fvm/
126 changes: 102 additions & 24 deletions lib/flutter_client_sse.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@ library flutter_client_sse;

import 'dart:async';
import 'dart:convert';

import 'package:flutter_client_sse/constants/sse_request_type_enum.dart';
import 'package:flutter_client_sse/retry_options.dart';
import 'package:flutter_client_sse/utils.dart';
import 'package:http/http.dart' as http;
import 'package:logging/logging.dart';

part 'sse_event_model.dart';

/// A client for subscribing to Server-Sent Events (SSE).
class SSEClient {
static http.Client _client = new http.Client();
static final _log = Logger('SSEClient');
static bool _stopSignal = false;

/// Retry the SSE connection after a delay.
///
Expand All @@ -17,48 +24,96 @@ class SSEClient {
/// [header] is a map of request headers.
/// [body] is an optional request body for POST requests.
/// [streamController] is required to persist the stream from the old connection
static void _retryConnection(
{required SSERequestType method,
required String url,
required Map<String, String> header,
required StreamController<SSEModel> streamController,
Map<String, dynamic>? body}) {
print('---RETRY CONNECTION---');
Future.delayed(Duration(seconds: 5), () {
/// [retryOptions] is the options for retrying the connection.
/// [currentRetry] is the current retry count.
///
static void _retryConnection({
required SSERequestType method,
required String url,
required Map<String, String> header,
required StreamController<SSEModel> streamController,
Map<String, dynamic>? body,
required RetryOptions retryOptions,
required int currentRetry,
}) {
if (_stopSignal) {
_log.info('---NO RETRY: STOP SIGNAL RECEIVED---');
streamController.close();
return;
}

_log.finest('$currentRetry retry of ${retryOptions.maxRetry} retries');

if (retryOptions.maxRetry != 0 && currentRetry >= retryOptions.maxRetry) {
_log.info('---MAX RETRY REACHED---');
retryOptions.limitReachedCallback?.call();
streamController.close();
return;
}
_log.info('---RETRY CONNECTION---');
int delay = _delay(
currentRetry, retryOptions.minRetryTime, retryOptions.maxRetryTime);
_log.finest('waiting for $delay ms');

Future.delayed(Duration(milliseconds: delay), () {
subscribeToSSE(
method: method,
url: url,
header: header,
body: body,
oldStreamController: streamController,
retryOptions: retryOptions,
retryCount: currentRetry + 1,
);
});
}

static int _delay(int currentRetry, int minRetryTime, int retryTime) {
return Utils.expBackoff(
minRetryTime, retryTime, currentRetry, _defaultJitterFn);
}

static int _defaultJitterFn(int num) {
var randomFactor = 0.26;

return Utils.jitter(num, randomFactor);
}

/// Subscribe to Server-Sent Events.
///
/// [method] is the request method (GET or POST).
/// [url] is the URL of the SSE endpoint.
/// [header] is a map of request headers.
/// [body] is an optional request body for POST requests.
/// [oldStreamController] stream controller, used to retry to persist the
/// stream from the old connection.
/// [client] is an optional http client used for testing purpose
/// or custom client.
/// [retryOptions] is the options for retrying the connection.
/// [retryCount] is the current retry count.
///
/// Returns a [Stream] of [SSEModel] representing the SSE events.
static Stream<SSEModel> subscribeToSSE(
{required SSERequestType method,
required String url,
required Map<String, String> header,
StreamController<SSEModel>? oldStreamController,
Map<String, dynamic>? body}) {
static Stream<SSEModel> subscribeToSSE({
required SSERequestType method,
required String url,
required Map<String, String> header,
StreamController<SSEModel>? oldStreamController,
http.Client? client,
Map<String, dynamic>? body,
RetryOptions? retryOptions,
int retryCount = 0,
}) {
RetryOptions _retryOptions = retryOptions ?? RetryOptions();
StreamController<SSEModel> streamController = StreamController();
if (oldStreamController != null) {
streamController = oldStreamController;
}
var lineRegex = RegExp(r'^([^:]*)(?::)?(?: )?(.*)?$');
var currentSSEModel = SSEModel(data: '', id: '', event: '');
print("--SUBSCRIBING TO SSE---");
_log.info("--SUBSCRIBING TO SSE---");
while (true) {
try {
_client = http.Client();
_client = client ?? http.Client();
var request = new http.Request(
method == SSERequestType.GET ? "GET" : "POST",
Uri.parse(url),
Expand All @@ -78,6 +133,20 @@ class SSEClient {

/// Listening to the response as a stream
response.asStream().listen((data) {
if (data.statusCode != 200) {
_log.severe('---ERROR CODE ${data.statusCode}---');
_retryConnection(
method: method,
url: url,
header: header,
body: body,
streamController: streamController,
retryOptions: _retryOptions,
currentRetry: retryCount,
);
return;
}

/// Applying transforms and listening to it
data.stream
..transform(Utf8Decoder()).transform(LineSplitter()).listen(
Expand Down Expand Up @@ -119,48 +188,56 @@ class SSEClient {
case 'retry':
break;
default:
print('---ERROR---');
print(dataLine);
_log.severe('---ERROR---');
_log.severe(dataLine);
_retryConnection(
method: method,
url: url,
header: header,
streamController: streamController,
retryOptions: _retryOptions,
currentRetry: retryCount,
);
}
},
onError: (e, s) {
print('---ERROR---');
print(e);
_log.severe('---ERROR---');
_log.severe(e);
_retryConnection(
method: method,
url: url,
header: header,
body: body,
streamController: streamController,
currentRetry: retryCount,
retryOptions: _retryOptions,
);
},
);
}, onError: (e, s) {
print('---ERROR---');
print(e);
_log.severe('---ERROR---');
_log.severe(e);
_retryConnection(
method: method,
url: url,
header: header,
body: body,
streamController: streamController,
retryOptions: _retryOptions,
currentRetry: retryCount,
);
});
} catch (e) {
print('---ERROR---');
print(e);
_log.severe('---ERROR---');
_log.severe(e);
_retryConnection(
method: method,
url: url,
header: header,
body: body,
streamController: streamController,
retryOptions: _retryOptions,
currentRetry: retryCount,
);
}
return streamController.stream;
Expand All @@ -169,6 +246,7 @@ class SSEClient {

/// Unsubscribe from the SSE.
static void unsubscribeFromSSE() {
_stopSignal = true;
_client.close();
}
}
13 changes: 13 additions & 0 deletions lib/retry_options.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
class RetryOptions {
int maxRetryTime;
int minRetryTime;
int maxRetry;
Future Function()? limitReachedCallback;

RetryOptions({
this.maxRetryTime = 5000,
this.minRetryTime = 5000,
this.maxRetry = 5,
this.limitReachedCallback,
});
}
35 changes: 35 additions & 0 deletions lib/utils.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import 'dart:math';

class Utils {
static int _defaultJitterFn(int num) => num;

static int jitter(int baseTime, double randomFactor) {
var rest = baseTime * randomFactor;
var rng = Random();
var jitter = (baseTime - rest) + rng.nextDouble() * rest;

return jitter.toInt();
}

static int expBackoff(
int initial,
int max,
int actualRetry, [
Function? jitterFn,
]) {
Function curatedFn;
curatedFn = jitterFn ?? _defaultJitterFn;
var base = initial << actualRetry;
var willWait = 0;
var isOverflowing = base <= 0;
willWait = (base > max || isOverflowing) ? curatedFn(max) : curatedFn(base);

return willWait.toInt();
}

static String? checkString(String data) {
var trim = data.trim();

return trim.isEmpty ? '' : trim;
}
}
Loading