@@ -5,6 +5,7 @@ import 'dart:math';
55import 'package:collection/collection.dart' ;
66import 'package:flutter/material.dart' ;
77import 'package:flutter_portal/flutter_portal.dart' ;
8+ import 'package:rxdart/rxdart.dart' ;
89import 'package:stream_chat_flutter/scrollable_positioned_list/scrollable_positioned_list.dart' ;
910import 'package:stream_chat_flutter/src/message_list_view/floating_date_divider.dart' ;
1011import 'package:stream_chat_flutter/src/message_list_view/loading_indicator.dart' ;
@@ -416,7 +417,7 @@ class _StreamMessageListViewState extends State<StreamMessageListView> {
416417 MessageListController get _messageListController =>
417418 widget.messageListController ?? _defaultController;
418419
419- StreamSubscription ? _messageNewListener;
420+ StreamSubscription < Message > ? _messageNewListener;
420421 StreamSubscription ? _userReadListener;
421422
422423 @override
@@ -444,17 +445,16 @@ class _StreamMessageListViewState extends State<StreamMessageListView> {
444445 debouncedMarkRead.cancel ();
445446 debouncedMarkThreadRead.cancel ();
446447
447- _messageNewListener? .cancel ();
448- _userReadListener? .cancel ();
449-
450448 _unreadState.value = _readUnreadSnapshot ();
451449
452- _messageNewListener =
453- streamChannel ! .channel. on ( EventType .messageNew). listen ((event ) {
454- final message = event.message;
455- if (message == null ) return ;
456- if (message.parentId != widget.parentMessage ? .id) return ;
450+ final state = streamChannel ? .channel.state;
451+ final newMessageStream = switch (widget.parentMessage ? .id ) {
452+ final parentId ? => state ? . newThreadMessageStream (parentId),
453+ _ => state ? .newMessageStream,
454+ } ;
457455
456+ _messageNewListener? .cancel ();
457+ _messageNewListener = newMessageStream? .listen ((message) {
458458 // Don't fight a scroll already in motion (drag, fling, or
459459 // still-running animated scrollTo).
460460 if (_scrollController? .isScrolling == true ) return ;
@@ -479,8 +479,8 @@ class _StreamMessageListViewState extends State<StreamMessageListView> {
479479 }
480480 });
481481
482- _userReadListener =
483- streamChannel ! .channel. state? .currentUserReadStream.listen ((_) {
482+ _userReadListener? . cancel ();
483+ _userReadListener = state? .currentUserReadStream.listen ((_) {
484484 _unreadState.value = _readUnreadSnapshot ();
485485 });
486486 }
@@ -1508,3 +1508,100 @@ class _StreamMessageListViewState extends State<StreamMessageListView> {
15081508 }
15091509 }
15101510}
1511+
1512+ /// Stream helpers for observing newly arrived messages on a channel,
1513+ /// either in the main message list or scoped to a thread.
1514+ extension on ChannelClientState {
1515+ // True when [candidate] represents a newer tail message than
1516+ // [previous].
1517+ //
1518+ // A new arrival requires both a different id *and* a strictly later
1519+ // [Message.createdAt], so edits, reactions, and reorderings are
1520+ // ignored.
1521+ bool _isNewTailArrival (Message candidate, Message ? previous) {
1522+ if (previous == null ) return true ;
1523+ return candidate.id != previous.id &&
1524+ candidate.createdAt.isAfter (previous.createdAt);
1525+ }
1526+
1527+ /// A stream that emits each newly arrived bottom message in
1528+ /// [messages] .
1529+ ///
1530+ /// Fires for every upstream that grows the list, including
1531+ /// server-confirmed `message.new` events, optimistic local sends,
1532+ /// and any other update that appends to the tail.
1533+ ///
1534+ /// A new arrival is detected when the bottom message's id changes
1535+ /// **and** its [Message.createdAt] is strictly after the previously
1536+ /// observed tail. Edits, reactions, tail deletions, and pruning are
1537+ /// therefore ignored.
1538+ ///
1539+ /// Gated on [isUpToDate] : while the channel is loaded around a
1540+ /// historic message the stream stays silent, and the first emission
1541+ /// after the gate re-opens re-seeds the baseline without yielding.
1542+ Stream <Message > get newMessageStream async * {
1543+ var wasUpToDate = isUpToDate;
1544+ var lastSeen = wasUpToDate ? messages.lastOrNull : null ;
1545+
1546+ await for (final updated in messagesStream) {
1547+ if (! isUpToDate) {
1548+ wasUpToDate = false ;
1549+ lastSeen = null ;
1550+ continue ;
1551+ }
1552+
1553+ // Re-seed without yielding: the gate just re-opened, the next
1554+ // emission is a wholesale window replacement, not an arrival.
1555+ if (! wasUpToDate) {
1556+ wasUpToDate = true ;
1557+ lastSeen = updated.lastOrNull;
1558+ continue ;
1559+ }
1560+
1561+ final newLast = updated.lastOrNull;
1562+ if (newLast == null ) {
1563+ lastSeen = null ;
1564+ continue ;
1565+ }
1566+
1567+ final isNewArrival = _isNewTailArrival (newLast, lastSeen);
1568+ lastSeen = newLast;
1569+ if (! isNewArrival) continue ;
1570+ yield newLast;
1571+ }
1572+ }
1573+
1574+ /// A stream that emits each newly arrived reply at the bottom of
1575+ /// the thread identified by [parentMessageId] .
1576+ ///
1577+ /// Fires for every upstream that grows the thread, including
1578+ /// server-confirmed replies, optimistic local sends, and any other
1579+ /// update that appends to the tail of [threads] .
1580+ ///
1581+ /// A new arrival is detected when the bottom reply's id changes
1582+ /// **and** its [Message.createdAt] is strictly after the previously
1583+ /// observed tail. Edits, reactions, tail deletions, and pruning are
1584+ /// therefore ignored.
1585+ ///
1586+ /// Threads load lazily, so the stream stays silent until [threads]
1587+ /// carries replies for [parentMessageId] ; that first snapshot seeds
1588+ /// the baseline without yielding.
1589+ Stream <Message > newThreadMessageStream (String parentMessageId) async * {
1590+ final threadMessages =
1591+ threadsStream.mapNotNull ((it) => it[parentMessageId]);
1592+
1593+ var lastSeen = threads[parentMessageId]? .lastOrNull;
1594+ await for (final updated in threadMessages) {
1595+ final newLast = updated.lastOrNull;
1596+ if (newLast == null ) {
1597+ lastSeen = null ;
1598+ continue ;
1599+ }
1600+
1601+ final isNewArrival = _isNewTailArrival (newLast, lastSeen);
1602+ lastSeen = newLast;
1603+ if (! isNewArrival) continue ;
1604+ yield newLast;
1605+ }
1606+ }
1607+ }
0 commit comments