Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/stream_chat_flutter/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## Upcoming Changes

🐞 Fixed

- Fixed `StreamMessageListView` not auto-scrolling to the bottom on the user's own outgoing message
until the server confirmed it.

## 9.24.0

✅ Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ class _StreamMessageListViewState extends State<StreamMessageListView> {
MessageListController get _messageListController =>
widget.messageListController ?? _defaultController;

StreamSubscription? _messageNewListener;
StreamSubscription<Message>? _messageNewListener;
StreamSubscription? _userReadListener;

@override
Expand Down Expand Up @@ -444,17 +444,16 @@ class _StreamMessageListViewState extends State<StreamMessageListView> {
debouncedMarkRead.cancel();
debouncedMarkThreadRead.cancel();

_messageNewListener?.cancel();
_userReadListener?.cancel();

_unreadState.value = _readUnreadSnapshot();

_messageNewListener =
streamChannel!.channel.on(EventType.messageNew).listen((event) {
final message = event.message;
if (message == null) return;
if (message.parentId != widget.parentMessage?.id) return;
final state = streamChannel?.channel.state;
final newMessageStream = switch (widget.parentMessage?.id) {
final parentId? => state?.newThreadMessageStream(parentId),
_ => state?.newMessageStream,
};

_messageNewListener?.cancel();
_messageNewListener = newMessageStream?.listen((message) {
// Don't fight a scroll already in motion (drag, fling, or
// still-running animated scrollTo).
if (_scrollController?.isScrolling == true) return;
Expand All @@ -479,8 +478,8 @@ class _StreamMessageListViewState extends State<StreamMessageListView> {
}
});

_userReadListener =
streamChannel!.channel.state?.currentUserReadStream.listen((_) {
_userReadListener?.cancel();
_userReadListener = state?.currentUserReadStream.listen((_) {
_unreadState.value = _readUnreadSnapshot();
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'package:collection/collection.dart';
import 'package:rxdart/rxdart.dart';
import 'package:stream_chat_flutter/scrollable_positioned_list/scrollable_positioned_list.dart';
import 'package:stream_chat_flutter/stream_chat_flutter.dart';

Expand Down Expand Up @@ -101,3 +102,100 @@ bool isElementAtIndexVisible(
bool isInitialMessage(String id, StreamChannelState? channelState) {
return channelState!.initialMessageId == id;
}

/// Stream helpers for observing newly arrived messages on a channel,
/// either in the main message list or scoped to a thread.
extension NewMessageStreamX on ChannelClientState {
// True when [candidate] represents a newer tail message than
// [previous].
//
// A new arrival requires both a different id *and* a strictly later
// [Message.createdAt], so edits, reactions, and reorderings are
// ignored.
bool _isNewTailArrival(Message candidate, Message? previous) {
if (previous == null) return true;
return candidate.id != previous.id &&
candidate.createdAt.isAfter(previous.createdAt);
}

/// A stream that emits each newly arrived bottom message in
/// [messages].
///
/// Fires for every upstream that grows the list, including
/// server-confirmed `message.new` events, optimistic local sends,
/// and any other update that appends to the tail.
///
/// A new arrival is detected when the bottom message's id changes
/// **and** its [Message.createdAt] is strictly after the previously
/// observed tail. Edits, reactions, tail deletions, and pruning are
/// therefore ignored.
///
/// Gated on [isUpToDate]: while the channel is loaded around a
/// historic message the stream stays silent, and the first emission
/// after the gate re-opens re-seeds the baseline without yielding.
Stream<Message> get newMessageStream async* {
var wasUpToDate = isUpToDate;
var lastSeen = wasUpToDate ? messages.lastOrNull : null;

await for (final updated in messagesStream) {
if (!isUpToDate) {
wasUpToDate = false;
lastSeen = null;
continue;
}

// Re-seed without yielding: the gate just re-opened, the next
// emission is a wholesale window replacement, not an arrival.
if (!wasUpToDate) {
wasUpToDate = true;
lastSeen = updated.lastOrNull;
continue;
}

final newLast = updated.lastOrNull;
if (newLast == null) {
lastSeen = null;
continue;
}

final isNewArrival = _isNewTailArrival(newLast, lastSeen);
lastSeen = newLast;
if (!isNewArrival) continue;
yield newLast;
}
}

/// A stream that emits each newly arrived reply at the bottom of
/// the thread identified by [parentMessageId].
///
/// Fires for every upstream that grows the thread, including
/// server-confirmed replies, optimistic local sends, and any other
/// update that appends to the tail of [threads].
///
/// A new arrival is detected when the bottom reply's id changes
/// **and** its [Message.createdAt] is strictly after the previously
/// observed tail. Edits, reactions, tail deletions, and pruning are
/// therefore ignored.
///
/// Threads load lazily, so the stream stays silent until [threads]
/// carries replies for [parentMessageId]; that first snapshot seeds
/// the baseline without yielding.
Stream<Message> newThreadMessageStream(String parentMessageId) async* {
final threadMessages =
threadsStream.mapNotNull((it) => it[parentMessageId]);

var lastSeen = threads[parentMessageId]?.lastOrNull;
await for (final updated in threadMessages) {
final newLast = updated.lastOrNull;
if (newLast == null) {
lastSeen = null;
continue;
}

final isNewArrival = _isNewTailArrival(newLast, lastSeen);
lastSeen = newLast;
if (!isNewArrival) continue;
yield newLast;
}
}
}
Loading
Loading