|
1 | 1 | import 'package:collection/collection.dart'; |
| 2 | +import 'package:rxdart/rxdart.dart'; |
2 | 3 | import 'package:stream_chat_flutter/scrollable_positioned_list/scrollable_positioned_list.dart'; |
3 | 4 | import 'package:stream_chat_flutter/stream_chat_flutter.dart'; |
4 | 5 |
|
@@ -101,3 +102,100 @@ bool isElementAtIndexVisible( |
101 | 102 | bool isInitialMessage(String id, StreamChannelState? channelState) { |
102 | 103 | return channelState!.initialMessageId == id; |
103 | 104 | } |
| 105 | + |
| 106 | +/// Stream helpers for observing newly arrived messages on a channel, |
| 107 | +/// either in the main message list or scoped to a thread. |
| 108 | +extension NewMessageStreamX on ChannelClientState { |
| 109 | + // True when [candidate] represents a newer tail message than |
| 110 | + // [previous]. |
| 111 | + // |
| 112 | + // A new arrival requires both a different id *and* a strictly later |
| 113 | + // [Message.createdAt], so edits, reactions, and reorderings are |
| 114 | + // ignored. |
| 115 | + bool _isNewTailArrival(Message candidate, Message? previous) { |
| 116 | + if (previous == null) return true; |
| 117 | + return candidate.id != previous.id && |
| 118 | + candidate.createdAt.isAfter(previous.createdAt); |
| 119 | + } |
| 120 | + |
| 121 | + /// A stream that emits each newly arrived bottom message in |
| 122 | + /// [messages]. |
| 123 | + /// |
| 124 | + /// Fires for every upstream that grows the list, including |
| 125 | + /// server-confirmed `message.new` events, optimistic local sends, |
| 126 | + /// and any other update that appends to the tail. |
| 127 | + /// |
| 128 | + /// A new arrival is detected when the bottom message's id changes |
| 129 | + /// **and** its [Message.createdAt] is strictly after the previously |
| 130 | + /// observed tail. Edits, reactions, tail deletions, and pruning are |
| 131 | + /// therefore ignored. |
| 132 | + /// |
| 133 | + /// Gated on [isUpToDate]: while the channel is loaded around a |
| 134 | + /// historic message the stream stays silent, and the first emission |
| 135 | + /// after the gate re-opens re-seeds the baseline without yielding. |
| 136 | + Stream<Message> get newMessageStream async* { |
| 137 | + var wasUpToDate = isUpToDate; |
| 138 | + var lastSeen = wasUpToDate ? messages.lastOrNull : null; |
| 139 | + |
| 140 | + await for (final updated in messagesStream) { |
| 141 | + if (!isUpToDate) { |
| 142 | + wasUpToDate = false; |
| 143 | + lastSeen = null; |
| 144 | + continue; |
| 145 | + } |
| 146 | + |
| 147 | + // Re-seed without yielding: the gate just re-opened, the next |
| 148 | + // emission is a wholesale window replacement, not an arrival. |
| 149 | + if (!wasUpToDate) { |
| 150 | + wasUpToDate = true; |
| 151 | + lastSeen = updated.lastOrNull; |
| 152 | + continue; |
| 153 | + } |
| 154 | + |
| 155 | + final newLast = updated.lastOrNull; |
| 156 | + if (newLast == null) { |
| 157 | + lastSeen = null; |
| 158 | + continue; |
| 159 | + } |
| 160 | + |
| 161 | + final isNewArrival = _isNewTailArrival(newLast, lastSeen); |
| 162 | + lastSeen = newLast; |
| 163 | + if (!isNewArrival) continue; |
| 164 | + yield newLast; |
| 165 | + } |
| 166 | + } |
| 167 | + |
| 168 | + /// A stream that emits each newly arrived reply at the bottom of |
| 169 | + /// the thread identified by [parentMessageId]. |
| 170 | + /// |
| 171 | + /// Fires for every upstream that grows the thread, including |
| 172 | + /// server-confirmed replies, optimistic local sends, and any other |
| 173 | + /// update that appends to the tail of [threads]. |
| 174 | + /// |
| 175 | + /// A new arrival is detected when the bottom reply's id changes |
| 176 | + /// **and** its [Message.createdAt] is strictly after the previously |
| 177 | + /// observed tail. Edits, reactions, tail deletions, and pruning are |
| 178 | + /// therefore ignored. |
| 179 | + /// |
| 180 | + /// Threads load lazily, so the stream stays silent until [threads] |
| 181 | + /// carries replies for [parentMessageId]; that first snapshot seeds |
| 182 | + /// the baseline without yielding. |
| 183 | + Stream<Message> newThreadMessageStream(String parentMessageId) async* { |
| 184 | + final threadMessages = |
| 185 | + threadsStream.mapNotNull((it) => it[parentMessageId]); |
| 186 | + |
| 187 | + var lastSeen = threads[parentMessageId]?.lastOrNull; |
| 188 | + await for (final updated in threadMessages) { |
| 189 | + final newLast = updated.lastOrNull; |
| 190 | + if (newLast == null) { |
| 191 | + lastSeen = null; |
| 192 | + continue; |
| 193 | + } |
| 194 | + |
| 195 | + final isNewArrival = _isNewTailArrival(newLast, lastSeen); |
| 196 | + lastSeen = newLast; |
| 197 | + if (!isNewArrival) continue; |
| 198 | + yield newLast; |
| 199 | + } |
| 200 | + } |
| 201 | +} |
0 commit comments