@@ -5,6 +5,7 @@ import 'dart:math';
55import 'package:collection/collection.dart' ;
66import 'package:flutter/material.dart' ;
77import 'package:flutter_portal/flutter_portal.dart' ;
8+ import 'package:rxdart/rxdart.dart' ;
89import 'package:stream_chat_flutter/scrollable_positioned_list/scrollable_positioned_list.dart' ;
910import 'package:stream_chat_flutter/src/message_list_view/floating_date_divider.dart' ;
1011import 'package:stream_chat_flutter/src/message_list_view/loading_indicator.dart' ;
@@ -416,7 +417,7 @@ class _StreamMessageListViewState extends State<StreamMessageListView> {
416417 MessageListController get _messageListController =>
417418 widget.messageListController ?? _defaultController;
418419
419- StreamSubscription ? _messageNewListener;
420+ StreamSubscription < Message > ? _messageNewListener;
420421 StreamSubscription ? _userReadListener;
421422
422423 @override
@@ -444,43 +445,41 @@ class _StreamMessageListViewState extends State<StreamMessageListView> {
444445 debouncedMarkRead.cancel ();
445446 debouncedMarkThreadRead.cancel ();
446447
447- _messageNewListener? .cancel ();
448- _userReadListener? .cancel ();
449-
450448 _unreadState.value = _readUnreadSnapshot ();
451449
452- _messageNewListener =
453- streamChannel ! .channel. on ( EventType .messageNew). listen ((event ) {
454- final message = event.message;
455- if (message == null ) return ;
456- if (message.parentId != widget.parentMessage ? .id) return ;
450+ final state = streamChannel ? .channel.state;
451+ final newMessageStream = switch (widget.parentMessage ? .id ) {
452+ final parentId ? => state ? . newThreadMessageStream (parentId),
453+ _ => state ? .newMessageStream,
454+ } ;
457455
456+ _messageNewListener? .cancel ();
457+ _messageNewListener = newMessageStream? .listen ((newMessage) {
458458 // Don't fight a scroll already in motion (drag, fling, or
459459 // still-running animated scrollTo).
460460 if (_scrollController? .isScrolling == true ) return ;
461461
462462 final currentUser = streamChannel? .channel.client.state.currentUser;
463- final isOwnMessage = message .user? .id == currentUser? .id;
463+ final isOwnMessage = newMessage .user? .id == currentUser? .id;
464464 final isAtBottom = ! _showScrollToBottom.value;
465465
466466 // Auto-scroll on own messages always; on others only when the
467467 // user is already at the bottom. For "far from bottom", SPL's
468- // itemKeyBuilder anchor preservation keeps the visible
469- // content pinned.
468+ // itemKeyBuilder anchor preservation keeps the visible content
469+ // pinned.
470470 if (! isOwnMessage && ! isAtBottom) return ;
471471
472- // Synchronous (not post-frame) so SPL's `_scrollTo` clears
473- // its anchor key before the next `didUpdateWidget` — otherwise
474- // anchor preservation would yank the layout back and produce
475- // a visible "shift, then animate" glitch.
476- if (_scrollController case final controller?
477- when controller.isAttached) {
478- controller.scrollTo (index: 0 );
472+ // Synchronous (not post-frame) so SPL's `_scrollTo` clears its
473+ // anchor key before the next `didUpdateWidget` — otherwise
474+ // anchor preservation would yank the layout back and produce a
475+ // visible "shift, then animate" glitch.
476+ if (_scrollController case final c? when c.isAttached) {
477+ c.scrollTo (index: 0 );
479478 }
480479 });
481480
482- _userReadListener =
483- streamChannel ! .channel. state? .currentUserReadStream.listen ((_) {
481+ _userReadListener? . cancel ();
482+ _userReadListener = state? .currentUserReadStream.listen ((_) {
484483 _unreadState.value = _readUnreadSnapshot ();
485484 });
486485 }
@@ -1508,3 +1507,92 @@ class _StreamMessageListViewState extends State<StreamMessageListView> {
15081507 }
15091508 }
15101509}
1510+
1511+ /// Stream helpers for observing newly arrived messages on a channel,
1512+ /// either in the main message list or scoped to a thread.
1513+ extension on ChannelClientState {
1514+ /// A stream that emits each newly arrived bottom message in
1515+ /// [messages] .
1516+ ///
1517+ /// Fires for every upstream that grows the list, including
1518+ /// server-confirmed `message.new` events, optimistic local sends,
1519+ /// and any other update that appends to the tail.
1520+ ///
1521+ /// A new arrival is detected when the bottom message's id changes
1522+ /// **and** its [Message.createdAt] is strictly after the previously
1523+ /// observed tail. Edits, reactions, tail deletions, and pruning are
1524+ /// therefore ignored.
1525+ ///
1526+ /// Gated on [isUpToDate] : while the channel is loaded around a
1527+ /// historic message the stream stays silent, and the first emission
1528+ /// after the gate re-opens re-seeds the baseline without yielding.
1529+ Stream <Message > get newMessageStream async * {
1530+ var wasUpToDate = isUpToDate;
1531+ var lastSeen = wasUpToDate ? messages.lastOrNull : null ;
1532+
1533+ await for (final updated in messagesStream) {
1534+ if (! isUpToDate) {
1535+ wasUpToDate = false ;
1536+ lastSeen = null ;
1537+ continue ;
1538+ }
1539+
1540+ // Re-seed without yielding: the gate just re-opened, the next
1541+ // emission is a wholesale window replacement, not an arrival.
1542+ if (! wasUpToDate) {
1543+ wasUpToDate = true ;
1544+ lastSeen = updated.lastOrNull;
1545+ continue ;
1546+ }
1547+
1548+ final newLast = updated.lastOrNull;
1549+ if (newLast == null ) {
1550+ lastSeen = null ;
1551+ continue ;
1552+ }
1553+
1554+ final isNewArrival = lastSeen == null ||
1555+ (newLast.id != lastSeen.id &&
1556+ newLast.createdAt.isAfter (lastSeen.createdAt));
1557+
1558+ lastSeen = newLast;
1559+ if (isNewArrival) yield newLast;
1560+ }
1561+ }
1562+
1563+ /// A stream that emits each newly arrived reply at the bottom of
1564+ /// the thread identified by [parentMessageId] .
1565+ ///
1566+ /// Fires for every upstream that grows the thread, including
1567+ /// server-confirmed replies, optimistic local sends, and any other
1568+ /// update that appends to the tail of [threads] .
1569+ ///
1570+ /// A new arrival is detected when the bottom reply's id changes
1571+ /// **and** its [Message.createdAt] is strictly after the previously
1572+ /// observed tail. Edits, reactions, tail deletions, and pruning are
1573+ /// therefore ignored.
1574+ ///
1575+ /// Threads load lazily, so the stream stays silent until [threads]
1576+ /// carries replies for [parentMessageId] ; that first snapshot seeds
1577+ /// the baseline without yielding.
1578+ Stream <Message > newThreadMessageStream (String parentMessageId) async * {
1579+ final threadMessages =
1580+ threadsStream.mapNotNull ((it) => it[parentMessageId]);
1581+
1582+ var lastSeen = threads[parentMessageId]? .lastOrNull;
1583+ await for (final updated in threadMessages) {
1584+ final newLast = updated.lastOrNull;
1585+ if (newLast == null ) {
1586+ lastSeen = null ;
1587+ continue ;
1588+ }
1589+
1590+ final isNewArrival = lastSeen == null ||
1591+ (newLast.id != lastSeen.id &&
1592+ newLast.createdAt.isAfter (lastSeen.createdAt));
1593+
1594+ lastSeen = newLast;
1595+ if (isNewArrival) yield newLast;
1596+ }
1597+ }
1598+ }
0 commit comments