@@ -5,6 +5,7 @@ import 'dart:math';
55import 'package:collection/collection.dart' ;
66import 'package:flutter/material.dart' ;
77import 'package:flutter_portal/flutter_portal.dart' ;
8+ import 'package:rxdart/rxdart.dart' ;
89import 'package:stream_chat_flutter/scrollable_positioned_list/scrollable_positioned_list.dart' ;
910import 'package:stream_chat_flutter/src/message_list_view/floating_date_divider.dart' ;
1011import 'package:stream_chat_flutter/src/message_list_view/loading_indicator.dart' ;
@@ -416,7 +417,7 @@ class _StreamMessageListViewState extends State<StreamMessageListView> {
416417 MessageListController get _messageListController =>
417418 widget.messageListController ?? _defaultController;
418419
419- StreamSubscription ? _messageNewListener;
420+ StreamSubscription < Message > ? _messageNewListener;
420421 StreamSubscription ? _userReadListener;
421422
422423 @override
@@ -444,43 +445,46 @@ class _StreamMessageListViewState extends State<StreamMessageListView> {
444445 debouncedMarkRead.cancel ();
445446 debouncedMarkThreadRead.cancel ();
446447
447- _messageNewListener? .cancel ();
448448 _userReadListener? .cancel ();
449+ _messageNewListener? .cancel ();
449450
450451 _unreadState.value = _readUnreadSnapshot ();
451452
452- _messageNewListener =
453- streamChannel! .channel.on (EventType .messageNew).listen ((event) {
454- final message = event.message;
455- if (message == null ) return ;
456- if (message.parentId != widget.parentMessage? .id) return ;
457-
453+ // `newMessageStream` fires on every path that grows the bottom
454+ // of the list — server-confirmed `message.new` events AND
455+ // optimistic local sends — so we follow to the new bottom for
456+ // both. Gated on `isUpToDate` to match Android/iOS/RN: no
457+ // auto-scroll while the channel is loaded around a historic id.
458+ final state = streamChannel! .channel.state! ;
459+ final newMessageStream = switch (widget.parentMessage? .id) {
460+ final parentId? => state.newThreadMessageStream (parentId),
461+ _ => state.newMessageStream,
462+ };
463+ _messageNewListener = newMessageStream.listen ((newMessage) {
458464 // Don't fight a scroll already in motion (drag, fling, or
459465 // still-running animated scrollTo).
460466 if (_scrollController? .isScrolling == true ) return ;
461467
462468 final currentUser = streamChannel? .channel.client.state.currentUser;
463- final isOwnMessage = message .user? .id == currentUser? .id;
469+ final isOwnMessage = newMessage .user? .id == currentUser? .id;
464470 final isAtBottom = ! _showScrollToBottom.value;
465471
466472 // Auto-scroll on own messages always; on others only when the
467473 // user is already at the bottom. For "far from bottom", SPL's
468- // itemKeyBuilder anchor preservation keeps the visible
469- // content pinned.
474+ // itemKeyBuilder anchor preservation keeps the visible content
475+ // pinned.
470476 if (! isOwnMessage && ! isAtBottom) return ;
471477
472- // Synchronous (not post-frame) so SPL's `_scrollTo` clears
473- // its anchor key before the next `didUpdateWidget` — otherwise
474- // anchor preservation would yank the layout back and produce
475- // a visible "shift, then animate" glitch.
476- if (_scrollController case final controller?
477- when controller.isAttached) {
478- controller.scrollTo (index: 0 );
478+ // Synchronous (not post-frame) so SPL's `_scrollTo` clears its
479+ // anchor key before the next `didUpdateWidget` — otherwise
480+ // anchor preservation would yank the layout back and produce a
481+ // visible "shift, then animate" glitch.
482+ if (_scrollController case final c? when c.isAttached) {
483+ c.scrollTo (index: 0 );
479484 }
480485 });
481486
482- _userReadListener =
483- streamChannel! .channel.state? .currentUserReadStream.listen ((_) {
487+ _userReadListener = state.currentUserReadStream.listen ((_) {
484488 _unreadState.value = _readUnreadSnapshot ();
485489 });
486490 }
@@ -1508,3 +1512,92 @@ class _StreamMessageListViewState extends State<StreamMessageListView> {
15081512 }
15091513 }
15101514}
1515+
1516+ /// Stream helpers for observing newly arrived messages on a channel,
1517+ /// either in the main message list or scoped to a thread.
1518+ extension on ChannelClientState {
1519+ /// A stream that emits each newly arrived bottom message in
1520+ /// [messages] .
1521+ ///
1522+ /// Fires for every upstream that grows the list, including
1523+ /// server-confirmed `message.new` events, optimistic local sends,
1524+ /// and any other update that appends to the tail.
1525+ ///
1526+ /// A new arrival is detected when the bottom message's id changes
1527+ /// **and** its [Message.createdAt] is strictly after the previously
1528+ /// observed tail. Edits, reactions, tail deletions, and pruning are
1529+ /// therefore ignored.
1530+ ///
1531+ /// Gated on [isUpToDate] : while the channel is loaded around a
1532+ /// historic message the stream stays silent, and the first emission
1533+ /// after the gate re-opens re-seeds the baseline without yielding.
1534+ Stream <Message > get newMessageStream async * {
1535+ var wasUpToDate = isUpToDate;
1536+ var lastSeen = wasUpToDate ? messages.lastOrNull : null ;
1537+
1538+ await for (final updated in messagesStream) {
1539+ if (! isUpToDate) {
1540+ wasUpToDate = false ;
1541+ lastSeen = null ;
1542+ continue ;
1543+ }
1544+
1545+ // Re-seed without yielding: the gate just re-opened, the next
1546+ // emission is a wholesale window replacement, not an arrival.
1547+ if (! wasUpToDate) {
1548+ wasUpToDate = true ;
1549+ lastSeen = updated.lastOrNull;
1550+ continue ;
1551+ }
1552+
1553+ final newLast = updated.lastOrNull;
1554+ if (newLast == null ) {
1555+ lastSeen = null ;
1556+ continue ;
1557+ }
1558+
1559+ final isNewArrival = lastSeen == null ||
1560+ (newLast.id != lastSeen.id &&
1561+ newLast.createdAt.isAfter (lastSeen.createdAt));
1562+
1563+ lastSeen = newLast;
1564+ if (isNewArrival) yield newLast;
1565+ }
1566+ }
1567+
1568+ /// A stream that emits each newly arrived reply at the bottom of
1569+ /// the thread identified by [parentMessageId] .
1570+ ///
1571+ /// Fires for every upstream that grows the thread, including
1572+ /// server-confirmed replies, optimistic local sends, and any other
1573+ /// update that appends to the tail of [threads] .
1574+ ///
1575+ /// A new arrival is detected when the bottom reply's id changes
1576+ /// **and** its [Message.createdAt] is strictly after the previously
1577+ /// observed tail. Edits, reactions, tail deletions, and pruning are
1578+ /// therefore ignored.
1579+ ///
1580+ /// Threads load lazily, so the stream stays silent until [threads]
1581+ /// carries replies for [parentMessageId] ; that first snapshot seeds
1582+ /// the baseline without yielding.
1583+ Stream <Message > newThreadMessageStream (String parentMessageId) async * {
1584+ final threadMessages =
1585+ threadsStream.mapNotNull ((it) => it[parentMessageId]);
1586+
1587+ var lastSeen = threads[parentMessageId]? .lastOrNull;
1588+ await for (final updated in threadMessages) {
1589+ final newLast = updated.lastOrNull;
1590+ if (newLast == null ) {
1591+ lastSeen = null ;
1592+ continue ;
1593+ }
1594+
1595+ final isNewArrival = lastSeen == null ||
1596+ (newLast.id != lastSeen.id &&
1597+ newLast.createdAt.isAfter (lastSeen.createdAt));
1598+
1599+ lastSeen = newLast;
1600+ if (isNewArrival) yield newLast;
1601+ }
1602+ }
1603+ }
0 commit comments