diff --git a/packages/stream_chat_persistence/CHANGELOG.md b/packages/stream_chat_persistence/CHANGELOG.md index 31bbe53fe..ea8d98308 100644 --- a/packages/stream_chat_persistence/CHANGELOG.md +++ b/packages/stream_chat_persistence/CHANGELOG.md @@ -1,8 +1,22 @@ -# Upcoming +## Upcoming Changes 🚀 Performance - Reduce the number of DB reads in the `ChatPersistenceClient.getChannelStates` method. +- Read only the messages matching the `PaginationParams` from DB when calling `MessageDao.getMessagesByCid` instead of reading all messages for the channel and applying pagination in memory. +- Read only the thread replies matching the `PaginationParams` from DB when calling `MessageDao.getThreadMessagesByParentId` instead of reading all replies for the thread and applying pagination in memory. +- Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. +- Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. + +🐞 Fixed + +- `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`. +- `MessageDao.getMessagesByCid` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour. +- `MessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail — mirroring how `lessThan` already returned the messages immediately before the pivot. +- `MessageDao.getThreadMessagesByParentId` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor reply), in addition to the existing `lessThan`/`greaterThan`. +- `MessageDao.getThreadMessagesByParentId` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour. +- `MessageDao.getThreadMessagesByParentId` with a `limit` now returns the page of replies closest to the pivot (immediately before a `lessThan`/`lessThanOrEqual` cursor, immediately after a `greaterThan`/`greaterThanOrEqual` cursor, or the thread tail when no cursor is set), instead of always returning the oldest replies of the thread. + ## 9.24.0 diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index 77d84dff0..f295115e7 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -1,5 +1,3 @@ -import 'dart:math'; - import 'package:drift/drift.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; @@ -126,42 +124,89 @@ class MessageDao extends DatabaseAccessor String parentId, { PaginationParams? options, }) async { - final msgList = await Future.wait(await (select(messages).join([ + final ( + lessThanCursor, + lessThanOrEqualCursor, + greaterThanCursor, + greaterThanOrEqualCursor, + ) = await ( + _lookupThreadCursor(parentId, options?.lessThan), + _lookupThreadCursor(parentId, options?.lessThanOrEqual), + _lookupThreadCursor(parentId, options?.greaterThan), + _lookupThreadCursor(parentId, options?.greaterThanOrEqual), + ).wait; + + // When the caller is paginating forward (greaterThan / greaterThanOrEqual + // only), order ASC so the SQL `LIMIT` retains the N replies immediately + // AFTER the cursor. Otherwise order DESC so `LIMIT` retains the N replies + // closest to a `lessThan` cursor (or the thread's tail when no cursor is + // set). The final result is always reshaped to ASC for display. + final isForwardPagination = + (greaterThanCursor != null || greaterThanOrEqualCursor != null) && + lessThanCursor == null && + lessThanOrEqualCursor == null; + + final orderBy = isForwardPagination + ? [ + OrderingTerm.asc(messages.createdAt), + OrderingTerm.asc(messages.id), + ] + : [ + OrderingTerm.desc(messages.createdAt), + OrderingTerm.desc(messages.id), + ]; + + final query = select(messages).join([ leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), leftOuterJoin( _pinnedByUsers, messages.pinnedByUserId.equalsExp(_pinnedByUsers.id), ), ]) - ..where(messages.parentId.isNotNull()) - ..where(messages.parentId.equals(parentId)) - ..orderBy([OrderingTerm.asc(messages.createdAt)])) - .map(_messageFromJoinRow) - .get()); - - if (msgList.isNotEmpty) { - if (options?.lessThan != null) { - final lessThanIndex = msgList.indexWhere( - (m) => m.id == options!.lessThan, - ); - if (lessThanIndex != -1) { - msgList.removeRange(lessThanIndex, msgList.length); - } - } - if (options?.greaterThan != null) { - final greaterThanIndex = msgList.indexWhere( - (m) => m.id == options!.greaterThan, - ); - if (greaterThanIndex != -1) { - msgList.removeRange(0, greaterThanIndex); - } - } - final limit = options?.limit; - if (limit != null && limit > 0) { - return msgList.take(limit).toList(); - } + ..where(messages.parentId.equals(parentId)) + ..orderBy(orderBy); + + // Cursor predicates compare the full `(createdAt, id)` tuple — the same + // key used in ORDER BY — so replies sharing a `createdAt` with the cursor + // fall on the correct side of the boundary. Filtering on `createdAt` + // alone would skip or repeat those siblings across pages. + if (lessThanCursor case final c?) { + query.where( + messages.createdAt.isSmallerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isSmallerThanValue(c.id)), + ); + } + if (lessThanOrEqualCursor case final c?) { + query.where( + messages.createdAt.isSmallerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isSmallerOrEqualValue(c.id)), + ); + } + if (greaterThanCursor case final c?) { + query.where( + messages.createdAt.isBiggerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isBiggerThanValue(c.id)), + ); } - return msgList; + if (greaterThanOrEqualCursor case final c?) { + query.where( + messages.createdAt.isBiggerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isBiggerOrEqualValue(c.id)), + ); + } + + if (options != null) { + query.limit(options.limit); + } + + final rows = await query.get(); + final orderedRows = isForwardPagination ? rows : rows.reversed.toList(); + + return Future.wait(orderedRows.map(_messageFromJoinRow)); } /// Returns all the messages of a channel by matching @@ -171,6 +216,38 @@ class MessageDao extends DatabaseAccessor bool fetchDraft = true, PaginationParams? messagePagination, }) async { + final ( + lessThanCursor, + lessThanOrEqualCursor, + greaterThanCursor, + greaterThanOrEqualCursor, + ) = await ( + _lookupCursor(messagePagination?.lessThan), + _lookupCursor(messagePagination?.lessThanOrEqual), + _lookupCursor(messagePagination?.greaterThan), + _lookupCursor(messagePagination?.greaterThanOrEqual), + ).wait; + + // When the caller is paginating forward (greaterThan / greaterThanOrEqual + // only), order ASC so the SQL `LIMIT` retains the N messages immediately + // AFTER the cursor. Otherwise order DESC so `LIMIT` retains the N most + // recent (closest to a `lessThan` cursor, or the channel tail when no + // cursor is set). The final result is always reshaped to ASC for display. + final isForwardPagination = + (greaterThanCursor != null || greaterThanOrEqualCursor != null) && + lessThanCursor == null && + lessThanOrEqualCursor == null; + + final orderBy = isForwardPagination + ? [ + OrderingTerm.asc(messages.createdAt), + OrderingTerm.asc(messages.id), + ] + : [ + OrderingTerm.desc(messages.createdAt), + OrderingTerm.desc(messages.id), + ]; + final query = select(messages).join([ leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), leftOuterJoin( @@ -180,44 +257,52 @@ class MessageDao extends DatabaseAccessor ]) ..where(messages.channelCid.equals(cid)) ..where(messages.parentId.isNull() | messages.showInChannel.equals(true)) - ..orderBy([OrderingTerm.asc(messages.createdAt)]); + ..orderBy(orderBy); - final result = await query.get(); - if (result.isEmpty) return []; - - final msgList = await Future.wait( - result.map( - (row) => _messageFromJoinRow( - row, - fetchDraft: fetchDraft, - ), - ), - ); + // Cursor predicates compare the full `(createdAt, id)` tuple — the same + // key used in ORDER BY — so messages sharing a `createdAt` with the cursor + // fall on the correct side of the boundary. Filtering on `createdAt` alone + // would skip or repeat those siblings across pages. + if (lessThanCursor case final c?) { + query.where( + messages.createdAt.isSmallerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isSmallerThanValue(c.id)), + ); + } + if (lessThanOrEqualCursor case final c?) { + query.where( + messages.createdAt.isSmallerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isSmallerOrEqualValue(c.id)), + ); + } + if (greaterThanCursor case final c?) { + query.where( + messages.createdAt.isBiggerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isBiggerThanValue(c.id)), + ); + } + if (greaterThanOrEqualCursor case final c?) { + query.where( + messages.createdAt.isBiggerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isBiggerOrEqualValue(c.id)), + ); + } - if (msgList.isNotEmpty) { - if (messagePagination?.lessThan != null) { - final lessThanIndex = msgList.indexWhere( - (m) => m.id == messagePagination!.lessThan, - ); - if (lessThanIndex != -1) { - msgList.removeRange(lessThanIndex, msgList.length); - } - } - if (messagePagination?.greaterThan != null) { - final greaterThanIndex = msgList.indexWhere( - (m) => m.id == messagePagination!.greaterThan, - ); - if (greaterThanIndex != -1) { - msgList.removeRange(0, greaterThanIndex); - } - } - if (messagePagination?.limit != null) { - return msgList - .skip(max(0, msgList.length - messagePagination!.limit)) - .toList(); - } + if (messagePagination != null) { + query.limit(messagePagination.limit); } - return msgList; + + final rows = await query.get(); + final orderedRows = isForwardPagination ? rows : rows.reversed.toList(); + + return Future.wait( + orderedRows + .map((row) => _messageFromJoinRow(row, fetchDraft: fetchDraft)), + ); } /// Updates the message data of a particular channel with @@ -241,4 +326,40 @@ class MessageDao extends DatabaseAccessor (batch) => batch.insertAllOnConflictUpdate(messages, entities), ); } + + /// Returns the `(createdAt, id)` cursor for the message with [id] in the + /// local cache, or `null` if [id] is null, the message isn't cached, or + /// isn't visible in the channel (i.e. a thread reply with + /// `showInChannel = false`). + Future<({DateTime createdAt, String id})?> _lookupCursor(String? id) async { + if (id == null) return null; + final createdAt = await (selectOnly(messages) + ..addColumns([messages.createdAt]) + ..where(messages.id.equals(id)) + ..where( + messages.parentId.isNull() | messages.showInChannel.equals(true), + )) + .map((row) => row.read(messages.createdAt)) + .getSingleOrNull(); + if (createdAt == null) return null; + return (createdAt: createdAt, id: id); + } + + /// Returns the `(createdAt, id)` cursor for the thread reply with [id] + /// under [parentId] in the local cache, or `null` if [id] is null or no + /// such reply is cached. + Future<({DateTime createdAt, String id})?> _lookupThreadCursor( + String parentId, + String? id, + ) async { + if (id == null) return null; + final createdAt = await (selectOnly(messages) + ..addColumns([messages.createdAt]) + ..where(messages.id.equals(id)) + ..where(messages.parentId.equals(parentId))) + .map((row) => row.read(messages.createdAt)) + .getSingleOrNull(); + if (createdAt == null) return null; + return (createdAt: createdAt, id: id); + } } diff --git a/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart b/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart index c5c5a6d45..6f5fdc6e6 100644 --- a/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart @@ -16,17 +16,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor /// Returns all the reactions of a particular message by matching /// [Reactions.messageId] with [messageId] - Future> getReactions(String messageId) => - (select(pinnedMessageReactions).join([ - leftOuterJoin(users, pinnedMessageReactions.userId.equalsExp(users.id)), - ]) - ..where(pinnedMessageReactions.messageId.equals(messageId)) - ..orderBy([OrderingTerm.asc(pinnedMessageReactions.createdAt)])) - .map((rows) { - final userEntity = rows.readTableOrNull(users); - final reactionEntity = rows.readTable(pinnedMessageReactions); - return reactionEntity.toReaction(user: userEntity?.toUser()); - }).get(); + Future> getReactions(String messageId) { + final where = pinnedMessageReactions.messageId.equals(messageId); + return _selectReactions(where); + } /// Returns all the reactions of a particular message /// added by a particular user by matching @@ -35,9 +28,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor Future> getReactionsByUserId( String messageId, String userId, - ) async { - final reactions = await getReactions(messageId); - return reactions.where((it) => it.userId == userId).toList(); + ) { + final where = pinnedMessageReactions.messageId.equals(messageId) & + pinnedMessageReactions.userId.equals(userId); + return _selectReactions(where); } /// Updates the reactions data with the new [reactionList] data @@ -57,4 +51,17 @@ class PinnedMessageReactionDao extends DatabaseAccessor (r) => r.messageId.isIn(messageIds), ); }); + + Future> _selectReactions(Expression where) { + final rows = select(pinnedMessageReactions).join([ + leftOuterJoin(users, pinnedMessageReactions.userId.equalsExp(users.id)), + ]) + ..where(where) + ..orderBy([OrderingTerm.asc(pinnedMessageReactions.createdAt)]); + return rows.map((row) { + final reactionEntity = row.readTable(pinnedMessageReactions); + final userEntity = row.readTableOrNull(users); + return reactionEntity.toReaction(user: userEntity?.toUser()); + }).get(); + } } diff --git a/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart b/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart index d6dae9bd9..9d726bc8e 100644 --- a/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart @@ -16,17 +16,10 @@ class ReactionDao extends DatabaseAccessor /// Returns all the reactions of a particular message by matching /// [Reactions.messageId] with [messageId] - Future> getReactions(String messageId) => - (select(reactions).join([ - leftOuterJoin(users, reactions.userId.equalsExp(users.id)), - ]) - ..where(reactions.messageId.equals(messageId)) - ..orderBy([OrderingTerm.asc(reactions.createdAt)])) - .map((rows) { - final userEntity = rows.readTableOrNull(users); - final reactionEntity = rows.readTable(reactions); - return reactionEntity.toReaction(user: userEntity?.toUser()); - }).get(); + Future> getReactions(String messageId) { + final where = reactions.messageId.equals(messageId); + return _selectReactions(where); + } /// Returns all the reactions of a particular message /// added by a particular user by matching @@ -35,9 +28,10 @@ class ReactionDao extends DatabaseAccessor Future> getReactionsByUserId( String messageId, String userId, - ) async { - final reactions = await getReactions(messageId); - return reactions.where((it) => it.userId == userId).toList(); + ) { + final where = + reactions.messageId.equals(messageId) & reactions.userId.equals(userId); + return _selectReactions(where); } /// Updates the reactions data with the new [reactionList] data @@ -57,4 +51,16 @@ class ReactionDao extends DatabaseAccessor (r) => r.messageId.isIn(messageIds), ); }); + + Future> _selectReactions(Expression where) { + final rows = select(reactions) + .join([leftOuterJoin(users, reactions.userId.equalsExp(users.id))]) + ..where(where) + ..orderBy([OrderingTerm.asc(reactions.createdAt)]); + return rows.map((row) { + final reactionEntity = row.readTable(reactions); + final userEntity = row.readTableOrNull(users); + return reactionEntity.toReaction(user: userEntity?.toUser()); + }).get(); + } } diff --git a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart index 29cda8c93..65b98d827 100644 --- a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart @@ -25,6 +25,12 @@ void main() { }) async { final channels = [ChannelModel(cid: cid)]; final users = List.generate(count, (index) => User(id: 'testUserId$index')); + // Strictly monotonic `createdAt` per message so SQL-side pagination + // filters (`WHERE createdAt < cutoff`, `ORDER BY createdAt ASC`) can't be + // confused by ties. Drift stores `DateTime` as integer Unix seconds by + // default, so the offset must be at least 1 second per row — otherwise + // sub-second offsets all round-trip onto the same second. + final baseTime = DateTime.now(); final messages = List.generate( count, (index) => Message( @@ -32,7 +38,7 @@ void main() { type: 'testType', user: users[index], channelRole: 'channel_member', - createdAt: DateTime.now(), + createdAt: baseTime.add(Duration(seconds: index)), shadowed: math.Random().nextBool(), replyCount: index, updatedAt: DateTime.now(), @@ -59,7 +65,7 @@ void main() { type: 'testType', user: users[index], channelRole: 'channel_member', - createdAt: DateTime.now(), + createdAt: baseTime.add(Duration(seconds: index)), shadowed: math.Random().nextBool(), replyCount: index, updatedAt: DateTime.now(), @@ -85,7 +91,7 @@ void main() { channelRole: 'channel_member', parentId: mapAllThreadToFirstMessage ? messages[0].id : messages[index].id, - createdAt: DateTime.now(), + createdAt: baseTime.add(Duration(seconds: index)), shadowed: math.Random().nextBool(), replyCount: index, updatedAt: DateTime.now(), @@ -269,57 +275,241 @@ void main() { } }); - test('getThreadMessagesByParentId', () async { + group('getThreadMessagesByParentId', () { const cid = 'test:Cid'; const parentId = 'testMessageId${cid}0'; - - // Messages should be empty initially - final messages = await messageDao.getThreadMessagesByParentId(parentId); - expect(messages, isEmpty); - - // Preparing test data - final insertedMessages = await _prepareTestData(cid, threads: true); - expect(insertedMessages, isNotEmpty); - - // Should fetch all the thread messages of parentId - final threadMessages = - await messageDao.getThreadMessagesByParentId(parentId); - expect(threadMessages.length, 1); - expect(threadMessages.first.parentId, parentId); - }); - - test('getThreadMessagesByParentId along with pagination', () async { - const cid = 'test:Cid'; - const parentId = 'testMessageId${cid}0'; - const options = PaginationParams( - limit: 15, - lessThan: 'testThreadMessageId${cid}25', - greaterThanOrEqual: 'testThreadMessageId${cid}5', - ); - - // Messages should be empty initially - final messages = await messageDao.getThreadMessagesByParentId( - parentId, - options: options, - ); - expect(messages, isEmpty); - - // Preparing test data - final insertedMessages = await _prepareTestData( - cid, - threads: true, - mapAllThreadToFirstMessage: true, - count: 30, - ); - expect(insertedMessages, isNotEmpty); - - // Should fetch all the thread messages of parentId and apply the pagination - final threadMessages = await messageDao.getThreadMessagesByParentId( - parentId, - options: options, - ); - expect(threadMessages.length, 15); - expect(threadMessages.first.parentId, parentId); + String threadId(int i) => 'testThreadMessageId$cid$i'; + + test('getThreadMessagesByParentId', () async { + // Messages should be empty initially + final messages = await messageDao.getThreadMessagesByParentId(parentId); + expect(messages, isEmpty); + + // Preparing test data + final insertedMessages = await _prepareTestData(cid, threads: true); + expect(insertedMessages, isNotEmpty); + + // Should fetch all the thread messages of parentId + final threadMessages = + await messageDao.getThreadMessagesByParentId(parentId); + expect(threadMessages.length, 1); + expect(threadMessages.first.parentId, parentId); + }); + + test('getThreadMessagesByParentId along with pagination', () async { + const options = PaginationParams( + limit: 15, + lessThan: 'testThreadMessageId${cid}25', + greaterThan: 'testThreadMessageId${cid}5', + ); + + // Messages should be empty initially + final messages = await messageDao.getThreadMessagesByParentId( + parentId, + options: options, + ); + expect(messages, isEmpty); + + // Preparing test data + final insertedMessages = await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + expect(insertedMessages, isNotEmpty); + + // Should fetch all the thread messages of parentId and apply the + // pagination. + final threadMessages = await messageDao.getThreadMessagesByParentId( + parentId, + options: options, + ); + expect(threadMessages.length, 15); + expect(threadMessages.first.parentId, parentId); + // lessThan is set → backward pagination → DESC + LIMIT then reverse. + // Filter: id6..id24. Take 15 closest to id25 → id10..id24. + expect(threadMessages.first.id, 'testThreadMessageId${cid}10'); + expect(threadMessages.last.id, 'testThreadMessageId${cid}24'); + }); + + test('limit only returns the latest N replies', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams(limit: 5), + ); + + // No cursor → backward pagination → DESC + LIMIT then reversed. The + // result is the newest 5 replies (the tail of the thread), in ASC + // order: id25..id29. + expect(replies.length, 5); + expect(replies.first.id, threadId(25)); + expect(replies.last.id, threadId(29)); + }); + + test('lessThan only excludes the cursor', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: PaginationParams(limit: 5, lessThan: threadId(25)), + ); + + // Strictly before id25 → id0..id24. Backward pagination keeps the 5 + // closest to the cursor → id20..id24. + expect(replies.length, 5); + expect(replies.first.id, threadId(20)); + expect(replies.last.id, threadId(24)); + }); + + test('lessThanOrEqual only includes the cursor', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: PaginationParams(limit: 5, lessThanOrEqual: threadId(25)), + ); + + // Up to and including id25 → id0..id25. Backward pagination keeps the + // 5 closest to the cursor → id21..id25. + expect(replies.length, 5); + expect(replies.first.id, threadId(21)); + expect(replies.last.id, threadId(25)); + }); + + test('greaterThan only excludes the cursor', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: PaginationParams(limit: 5, greaterThan: threadId(5)), + ); + + // Strictly after id5 → id6..id29, capped to 5 → id6..id10. + expect(replies.length, 5); + expect(replies.first.id, threadId(6)); + expect(replies.last.id, threadId(10)); + }); + + test('greaterThanOrEqual only includes the cursor', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: PaginationParams(limit: 5, greaterThanOrEqual: threadId(5)), + ); + + // From id5 onwards → id5..id29, capped to 5 → id5..id9. + expect(replies.length, 5); + expect(replies.first.id, threadId(5)); + expect(replies.last.id, threadId(9)); + }); + + test('cursor with tied createdAt does not skip or duplicate siblings', + () async { + // Three replies share an identical `createdAt`. The SQL ORDER BY uses + // the `(createdAt, id)` tuple, so within the trio the relative order is + // by id (lexicographic). A cursor at `reply_tieB` must split the trio + // cleanly: `reply_tieA` lands on the "before" side, `reply_tieC` on the + // "after" side. A `createdAt`-only WHERE predicate would collapse all + // three into the cursor's bucket and drop or keep them together. + final users = [User(id: 'tieUser')]; + await database.userDao.updateUsers(users); + await database.channelDao.updateChannels([ChannelModel(cid: cid)]); + + final tie = DateTime.now(); + final earlier = tie.subtract(const Duration(seconds: 1)); + final later = tie.add(const Duration(seconds: 1)); + + Message parent() => Message( + id: parentId, + user: users.first, + createdAt: earlier, + updatedAt: earlier, + text: parentId, + ); + + Message reply(String id, DateTime t) => Message( + id: id, + user: users.first, + parentId: parentId, + createdAt: t, + updatedAt: t, + text: id, + ); + + await messageDao.updateMessages(cid, [ + parent(), + reply('reply_pre', earlier), + reply('reply_tieA', tie), + reply('reply_tieB', tie), + reply('reply_tieC', tie), + reply('reply_post', later), + ]); + + final before = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams(limit: 100, lessThan: 'reply_tieB'), + ); + expect(before.map((m) => m.id).toList(), ['reply_pre', 'reply_tieA']); + + final after = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams(limit: 100, greaterThan: 'reply_tieB'), + ); + expect(after.map((m) => m.id).toList(), ['reply_tieC', 'reply_post']); + + final atOrBefore = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams( + limit: 100, + lessThanOrEqual: 'reply_tieB', + ), + ); + expect( + atOrBefore.map((m) => m.id).toList(), + ['reply_pre', 'reply_tieA', 'reply_tieB'], + ); + + final atOrAfter = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams( + limit: 100, + greaterThanOrEqual: 'reply_tieB', + ), + ); + expect( + atOrAfter.map((m) => m.id).toList(), + ['reply_tieB', 'reply_tieC', 'reply_post'], + ); + }); }); test('getMessagesByCid', () async { @@ -365,11 +555,11 @@ void main() { const cid = 'test:Cid'; const limit = 15; const lessThan = 'testMessageId${cid}25'; - const greaterThanOrEqual = 'testMessageId${cid}5'; + const greaterThan = 'testMessageId${cid}5'; const pagination = PaginationParams( limit: limit, lessThan: lessThan, - greaterThanOrEqual: greaterThanOrEqual, + greaterThan: greaterThan, ); // Should be empty initially @@ -389,8 +579,294 @@ void main() { messagePagination: pagination, ); expect(fetchedMessages.length, limit); + expect(fetchedMessages.first.id, 'testMessageId${cid}10'); expect(fetchedMessages.last.id, 'testMessageId${cid}24'); - expect(fetchedMessages.first.id != lessThan, true); + }); + + group('getMessagesByCid pagination', () { + const cid = 'test:Cid'; + + test('lessThan only trims messages from the end', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 25); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}24'); + }); + + test('greaterThan only trims messages from the start (exclusive)', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThan: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 24); + expect(fetchedMessages.first.id, 'testMessageId${cid}6'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('limit only keeps the last N messages', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams(limit: 15), + ); + + expect(fetchedMessages.length, 15); + expect(fetchedMessages.first.id, 'testMessageId${cid}15'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('lessThan id not in result set is a no-op', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'missing-id', + ), + ); + + expect(fetchedMessages.length, 30); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('greaterThan id not in result set is a no-op', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThan: 'missing-id', + ), + ); + + expect(fetchedMessages.length, 30); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('thread-reply id as cursor is a no-op (not visible in channel)', + () async { + // `_prepareTestData` inserts thread replies with `parentId` set and + // `showInChannel` left null — i.e. not visible in the channel query. + // Passing such an id as a cursor must resolve to a no-op so the main + // query falls back to returning the full channel slice. + await _prepareTestData(cid, count: 30, threads: true); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'testThreadMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 30); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('default PaginationParams() applies implicit limit of 10', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams(), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}20'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('default limit + lessThan returns last 10 of filtered set', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + lessThan: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}15'); + expect(fetchedMessages.last.id, 'testMessageId${cid}24'); + }); + + test('default limit + greaterThan returns first 10 after the pivot', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + greaterThan: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}6'); + expect(fetchedMessages.last.id, 'testMessageId${cid}15'); + }); + + test('lessThanOrEqual is inclusive of the pivot', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThanOrEqual: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 26); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}25'); + }); + + test('greaterThanOrEqual is inclusive of the pivot', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThanOrEqual: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 25); + expect(fetchedMessages.first.id, 'testMessageId${cid}5'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('default limit + lessThanOrEqual returns the pivot and 9 before', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + lessThanOrEqual: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}16'); + expect(fetchedMessages.last.id, 'testMessageId${cid}25'); + }); + + test('default limit + greaterThanOrEqual returns the pivot and 9 after', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + greaterThanOrEqual: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}5'); + expect(fetchedMessages.last.id, 'testMessageId${cid}14'); + }); + + test('cursor with tied createdAt does not skip or duplicate siblings', + () async { + // Three messages share an identical `createdAt`. The SQL ORDER BY uses + // the `(createdAt, id)` tuple, so within the trio the relative order is + // by id (lexicographic). A cursor at `msg_tieB` must split the trio + // cleanly: `msg_tieA` lands on the "before" side, `msg_tieC` on the + // "after" side. A `createdAt`-only WHERE predicate would collapse all + // three into the cursor's bucket and drop or keep them together. + final users = [User(id: 'tieUser')]; + await database.userDao.updateUsers(users); + await database.channelDao.updateChannels([ChannelModel(cid: cid)]); + + final tie = DateTime.now(); + final earlier = tie.subtract(const Duration(seconds: 1)); + final later = tie.add(const Duration(seconds: 1)); + + Message m(String id, DateTime t) => Message( + id: id, + user: users.first, + createdAt: t, + updatedAt: t, + text: id, + ); + + await messageDao.updateMessages(cid, [ + m('msg_pre', earlier), + m('msg_tieA', tie), + m('msg_tieB', tie), + m('msg_tieC', tie), + m('msg_post', later), + ]); + + final before = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'msg_tieB', + ), + ); + expect(before.map((m) => m.id).toList(), ['msg_pre', 'msg_tieA']); + + final after = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThan: 'msg_tieB', + ), + ); + expect(after.map((m) => m.id).toList(), ['msg_tieC', 'msg_post']); + + final atOrBefore = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThanOrEqual: 'msg_tieB', + ), + ); + expect( + atOrBefore.map((m) => m.id).toList(), + ['msg_pre', 'msg_tieA', 'msg_tieB'], + ); + + final atOrAfter = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThanOrEqual: 'msg_tieB', + ), + ); + expect( + atOrAfter.map((m) => m.id).toList(), + ['msg_tieB', 'msg_tieC', 'msg_post'], + ); + }); }); test('updateMessages', () async { diff --git a/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart index 44abc4a64..640399ef8 100644 --- a/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart @@ -81,20 +81,27 @@ void main() { test('getReactionsByUserId', () async { const messageId = 'testMessageId'; const userId = 'testUserId'; + const otherUserId = 'otherUserid'; // Should be empty initially final reactions = await pinnedMessageReactionDao.getReactionsByUserId(messageId, userId); expect(reactions, isEmpty); - // Adding sample reactions + // Adding sample reactions from the target user. final insertedReactions = await _prepareReactionData(messageId, userId: userId); expect(insertedReactions, isNotEmpty); - // Fetched reaction length should match inserted reactions length. + // Adding sample reactions from other users on the same message. + final otherInsertedReactions = + await _prepareReactionData(messageId, userId: otherUserId); + expect(otherInsertedReactions, isNotEmpty); + + // Fetched reaction length should match the target user's reactions only. // Every reaction messageId should match the provided messageId. - // Every reaction userId should match the provided userId. + // Every reaction userId should match the provided userId — i.e. reactions + // from other users on the same message must be filtered out. final fetchedReactions = await pinnedMessageReactionDao.getReactionsByUserId(messageId, userId); expect(fetchedReactions.length, insertedReactions.length); diff --git a/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart index 7fa3569e4..38c0ff7d5 100644 --- a/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart @@ -80,19 +80,26 @@ void main() { test('getReactionsByUserId', () async { const messageId = 'testMessageId'; const userId = 'testUserId'; + const otherUserId = 'otherUserid'; // Should be empty initially final reactions = await reactionDao.getReactionsByUserId(messageId, userId); expect(reactions, isEmpty); - // Adding sample reactions + // Adding sample reactions from the target user. final insertedReactions = await _prepareReactionData(messageId, userId: userId); expect(insertedReactions, isNotEmpty); - // Fetched reaction length should match inserted reactions length. + // Adding sample reactions from other users on the same message. + final otherInsertedReactions = + await _prepareReactionData(messageId, userId: otherUserId); + expect(otherInsertedReactions, isNotEmpty); + + // Fetched reaction length should match the target user's reactions only. // Every reaction messageId should match the provided messageId. - // Every reaction userId should match the provided userId. + // Every reaction userId should match the provided userId — i.e. reactions + // from other users on the same message must be filtered out. final fetchedReactions = await reactionDao.getReactionsByUserId(messageId, userId); expect(fetchedReactions.length, insertedReactions.length);