@@ -14,8 +14,7 @@ import '/_common.dart';
1414
1515// ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░
1616
17- abstract class StreamService <TData extends Object >
18- with ServiceMixin , StreamServiceMixin <TData > {
17+ abstract class StreamService <TData extends Object > with ServiceMixin , StreamServiceMixin <TData > {
1918 StreamService ();
2019}
2120
@@ -27,22 +26,20 @@ mixin StreamServiceMixin<TData extends Object> on ServiceMixin {
2726 //
2827
2928 Option <SafeCompleter <TData >> _initDataCompleter = const None ();
30- Option <Resolvable <TData >> get initialData =>
31- _initDataCompleter.map ((e) => e.resolvable ());
29+ Option <Resolvable <TData >> get initialData => _initDataCompleter.map ((e) => e.resolvable ());
3230
3331 Option <StreamSubscription <Result <TData >>> _streamSubscription = const None ();
3432
3533 Option <StreamController <Result <TData >>> _streamController = const None ();
36- Option <Stream <Result <TData >>> get stream =>
37- _streamController.map ((c) => c.stream);
34+ Option <Stream <Result <TData >>> get stream => _streamController.map ((c) => c.stream);
3835
3936 //
4037 //
4138 //
4239
4340 @override
4441 @mustCallSuper
45- provideInitListeners (void _) => [(_) => _startStream ()];
42+ provideInitListeners (void _) => [(_) => restartStream ()];
4643
4744 @override
4845 @mustCallSuper
@@ -55,8 +52,6 @@ mixin StreamServiceMixin<TData extends Object> on ServiceMixin {
5552 }).end ();
5653 return Sync .value (Ok (Unit ()));
5754 },
58- // Or you can do this, but its less effective:
59- //(_) => _stopStream()
6055 ];
6156 }
6257
@@ -69,21 +64,27 @@ mixin StreamServiceMixin<TData extends Object> on ServiceMixin {
6964 _streamSubscription.ifSome ((sub) => sub.unwrap ().resume ()).end ();
7065 return Sync .value (Ok (Unit ()));
7166 },
72- // Or you can do this, but its less effective:
73- //(_) => _startStream()
7467 ];
7568 }
7669
7770 @override
7871 @mustCallSuper
79- provideDisposeListeners (void _) => [(_) => _stopStream ()];
72+ provideDisposeListeners (void _) => [(_) => stopStream ()];
8073
8174 //
8275 //
8376 //
8477
85- Resolvable <Unit > _startStream () {
86- return _stopStream ().map ((_) {
78+ Resolvable <Unit > restartStream () {
79+ return stopStream ().map ((_) => _startStream ()).flatten ();
80+ }
81+
82+ //
83+ //
84+ //
85+
86+ Sync <Unit > _startStream () {
87+ return Sync (() {
8788 _initDataCompleter = Some (SafeCompleter <TData >());
8889 final controller = StreamController <Result <TData >>.broadcast ();
8990 _streamController = Some (controller);
@@ -103,7 +104,8 @@ mixin StreamServiceMixin<TData extends Object> on ServiceMixin {
103104 //
104105 //
105106
106- Resolvable <Unit > _stopStream () {
107+ @protected
108+ Resolvable <Unit > stopStream () {
107109 UNSAFE :
108110 {
109111 final prevSubscription = _streamSubscription;
0 commit comments