From 88987e91b5cf49a42d8fc7dbcc2703b1471c87b6 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Tue, 19 May 2026 13:08:49 +0200 Subject: [PATCH 01/12] refactor(dao): Filter reactions by userId at DB level. --- .../lib/src/dao/reaction_dao.dart | 34 +++++++++++-------- .../test/src/dao/reaction_dao_test.dart | 13 +++++-- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart b/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart index d6dae9bd99..9d726bc8e3 100644 --- a/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart @@ -16,17 +16,10 @@ class ReactionDao extends DatabaseAccessor /// Returns all the reactions of a particular message by matching /// [Reactions.messageId] with [messageId] - Future> getReactions(String messageId) => - (select(reactions).join([ - leftOuterJoin(users, reactions.userId.equalsExp(users.id)), - ]) - ..where(reactions.messageId.equals(messageId)) - ..orderBy([OrderingTerm.asc(reactions.createdAt)])) - .map((rows) { - final userEntity = rows.readTableOrNull(users); - final reactionEntity = rows.readTable(reactions); - return reactionEntity.toReaction(user: userEntity?.toUser()); - }).get(); + Future> getReactions(String messageId) { + final where = reactions.messageId.equals(messageId); + return _selectReactions(where); + } /// Returns all the reactions of a particular message /// added by a particular user by matching @@ -35,9 +28,10 @@ class ReactionDao extends DatabaseAccessor Future> getReactionsByUserId( String messageId, String userId, - ) async { - final reactions = await getReactions(messageId); - return reactions.where((it) => it.userId == userId).toList(); + ) { + final where = + reactions.messageId.equals(messageId) & reactions.userId.equals(userId); + return _selectReactions(where); } /// Updates the reactions data with the new [reactionList] data @@ -57,4 +51,16 @@ class ReactionDao extends DatabaseAccessor (r) => r.messageId.isIn(messageIds), ); }); + + Future> _selectReactions(Expression where) { + final rows = select(reactions) + .join([leftOuterJoin(users, reactions.userId.equalsExp(users.id))]) + ..where(where) + ..orderBy([OrderingTerm.asc(reactions.createdAt)]); + return rows.map((row) { + final reactionEntity = row.readTable(reactions); + final userEntity = row.readTableOrNull(users); + return reactionEntity.toReaction(user: userEntity?.toUser()); + }).get(); + } } diff --git a/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart index 7fa3569e4e..38c0ff7d58 100644 --- a/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart @@ -80,19 +80,26 @@ void main() { test('getReactionsByUserId', () async { const messageId = 'testMessageId'; const userId = 'testUserId'; + const otherUserId = 'otherUserid'; // Should be empty initially final reactions = await reactionDao.getReactionsByUserId(messageId, userId); expect(reactions, isEmpty); - // Adding sample reactions + // Adding sample reactions from the target user. final insertedReactions = await _prepareReactionData(messageId, userId: userId); expect(insertedReactions, isNotEmpty); - // Fetched reaction length should match inserted reactions length. + // Adding sample reactions from other users on the same message. + final otherInsertedReactions = + await _prepareReactionData(messageId, userId: otherUserId); + expect(otherInsertedReactions, isNotEmpty); + + // Fetched reaction length should match the target user's reactions only. // Every reaction messageId should match the provided messageId. - // Every reaction userId should match the provided userId. + // Every reaction userId should match the provided userId — i.e. reactions + // from other users on the same message must be filtered out. final fetchedReactions = await reactionDao.getReactionsByUserId(messageId, userId); expect(fetchedReactions.length, insertedReactions.length); From fe4d4e28bc18568c79fd73638aca018a2af3c863 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Tue, 19 May 2026 14:41:49 +0200 Subject: [PATCH 02/12] refactor(dao): Filter reactions by userId at DB level (pinned messages). --- .../src/dao/pinned_message_reaction_dao.dart | 36 +++++++++++-------- .../dao/pinned_message_reaction_dao_test.dart | 13 +++++-- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart b/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart index c5c5a6d456..4ec053b436 100644 --- a/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart @@ -16,17 +16,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor /// Returns all the reactions of a particular message by matching /// [Reactions.messageId] with [messageId] - Future> getReactions(String messageId) => - (select(pinnedMessageReactions).join([ - leftOuterJoin(users, pinnedMessageReactions.userId.equalsExp(users.id)), - ]) - ..where(pinnedMessageReactions.messageId.equals(messageId)) - ..orderBy([OrderingTerm.asc(pinnedMessageReactions.createdAt)])) - .map((rows) { - final userEntity = rows.readTableOrNull(users); - final reactionEntity = rows.readTable(pinnedMessageReactions); - return reactionEntity.toReaction(user: userEntity?.toUser()); - }).get(); + Future> getReactions(String messageId) { + final where = pinnedMessageReactions.messageId.equals(messageId); + return _selectReactions(where); + } /// Returns all the reactions of a particular message /// added by a particular user by matching @@ -34,10 +27,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor /// [Reactions.userId] with [userId] Future> getReactionsByUserId( String messageId, - String userId, - ) async { - final reactions = await getReactions(messageId); - return reactions.where((it) => it.userId == userId).toList(); + String userId,) { + final where = pinnedMessageReactions.messageId.equals( + messageId) & pinnedMessageReactions.userId.equals(userId); + return _selectReactions(where); } /// Updates the reactions data with the new [reactionList] data @@ -57,4 +50,17 @@ class PinnedMessageReactionDao extends DatabaseAccessor (r) => r.messageId.isIn(messageIds), ); }); + + Future> _selectReactions(Expression where) { + final rows = select(pinnedMessageReactions).join([ + leftOuterJoin(users, pinnedMessageReactions.userId.equalsExp(users.id)), + ]) + ..where(where) + ..orderBy([OrderingTerm.asc(pinnedMessageReactions.createdAt)]); + return rows.map((row) { + final reactionEntity = row.readTable(pinnedMessageReactions); + final userEntity = row.readTableOrNull(users); + return reactionEntity.toReaction(user: userEntity?.toUser()); + }).get(); + } } diff --git a/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart index 44abc4a644..640399ef87 100644 --- a/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart @@ -81,20 +81,27 @@ void main() { test('getReactionsByUserId', () async { const messageId = 'testMessageId'; const userId = 'testUserId'; + const otherUserId = 'otherUserid'; // Should be empty initially final reactions = await pinnedMessageReactionDao.getReactionsByUserId(messageId, userId); expect(reactions, isEmpty); - // Adding sample reactions + // Adding sample reactions from the target user. final insertedReactions = await _prepareReactionData(messageId, userId: userId); expect(insertedReactions, isNotEmpty); - // Fetched reaction length should match inserted reactions length. + // Adding sample reactions from other users on the same message. + final otherInsertedReactions = + await _prepareReactionData(messageId, userId: otherUserId); + expect(otherInsertedReactions, isNotEmpty); + + // Fetched reaction length should match the target user's reactions only. // Every reaction messageId should match the provided messageId. - // Every reaction userId should match the provided userId. + // Every reaction userId should match the provided userId — i.e. reactions + // from other users on the same message must be filtered out. final fetchedReactions = await pinnedMessageReactionDao.getReactionsByUserId(messageId, userId); expect(fetchedReactions.length, insertedReactions.length); From 466d9d3ee84ad498d64221bfe3ac496b88fbcf79 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Wed, 20 May 2026 12:01:43 +0200 Subject: [PATCH 03/12] refactor(dao): optimize message retrieval with SQL-side pagination filters --- .../lib/src/dao/message_dao.dart | 77 ++++----- .../test/src/dao/message_dao_test.dart | 147 +++++++++++++++++- 2 files changed, 180 insertions(+), 44 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index 77d84dff07..fdd84526fb 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -1,5 +1,3 @@ -import 'dart:math'; - import 'package:drift/drift.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; @@ -171,6 +169,15 @@ class MessageDao extends DatabaseAccessor bool fetchDraft = true, PaginationParams? messagePagination, }) async { + final lessThanCutoff = await switch (messagePagination?.lessThan) { + final id? => _lookupMessageCreatedAt(id), + _ => null, + }; + final greaterThanCutoff = await switch (messagePagination?.greaterThan) { + final id? => _lookupMessageCreatedAt(id), + _ => null, + }; + final query = select(messages).join([ leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), leftOuterJoin( @@ -180,44 +187,28 @@ class MessageDao extends DatabaseAccessor ]) ..where(messages.channelCid.equals(cid)) ..where(messages.parentId.isNull() | messages.showInChannel.equals(true)) - ..orderBy([OrderingTerm.asc(messages.createdAt)]); + ..orderBy([ + OrderingTerm.desc(messages.createdAt), + OrderingTerm.desc(messages.id), + ]); - final result = await query.get(); - if (result.isEmpty) return []; - - final msgList = await Future.wait( - result.map( - (row) => _messageFromJoinRow( - row, - fetchDraft: fetchDraft, - ), - ), - ); + if (lessThanCutoff case final t?) { + query.where(messages.createdAt.isSmallerThanValue(t)); + } + if (greaterThanCutoff case final t?) { + query.where(messages.createdAt.isBiggerOrEqualValue(t)); + } - if (msgList.isNotEmpty) { - if (messagePagination?.lessThan != null) { - final lessThanIndex = msgList.indexWhere( - (m) => m.id == messagePagination!.lessThan, - ); - if (lessThanIndex != -1) { - msgList.removeRange(lessThanIndex, msgList.length); - } - } - if (messagePagination?.greaterThan != null) { - final greaterThanIndex = msgList.indexWhere( - (m) => m.id == messagePagination!.greaterThan, - ); - if (greaterThanIndex != -1) { - msgList.removeRange(0, greaterThanIndex); - } - } - if (messagePagination?.limit != null) { - return msgList - .skip(max(0, msgList.length - messagePagination!.limit)) - .toList(); - } + if (messagePagination != null) { + query.limit(messagePagination.limit); } - return msgList; + + final rows = (await query.get()).reversed.toList(); + if (rows.isEmpty) return []; + + return Future.wait( + rows.map((row) => _messageFromJoinRow(row, fetchDraft: fetchDraft)), + ); } /// Updates the message data of a particular channel with @@ -241,4 +232,16 @@ class MessageDao extends DatabaseAccessor (batch) => batch.insertAllOnConflictUpdate(messages, entities), ); } + + /// Returns the `createdAt` of the message with [id] in the local cache, + /// or `null` if the message isn't cached. Used by [getMessagesByCid] to + /// resolve `lessThan` / `greaterThan` pagination cursors into SQL-comparable + /// timestamps so the filter runs in SQL instead of after the fact in Dart. + Future _lookupMessageCreatedAt(String id) { + return (selectOnly(messages) + ..addColumns([messages.createdAt]) + ..where(messages.id.equals(id))) + .map((row) => row.read(messages.createdAt)) + .getSingleOrNull(); + } } diff --git a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart index 29cda8c938..c4f2af1518 100644 --- a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart @@ -25,6 +25,12 @@ void main() { }) async { final channels = [ChannelModel(cid: cid)]; final users = List.generate(count, (index) => User(id: 'testUserId$index')); + // Strictly monotonic `createdAt` per message so SQL-side pagination + // filters (`WHERE createdAt < cutoff`, `ORDER BY createdAt ASC`) can't be + // confused by ties. Drift stores `DateTime` as integer Unix seconds by + // default, so the offset must be at least 1 second per row — otherwise + // sub-second offsets all round-trip onto the same second. + final baseTime = DateTime.now(); final messages = List.generate( count, (index) => Message( @@ -32,7 +38,7 @@ void main() { type: 'testType', user: users[index], channelRole: 'channel_member', - createdAt: DateTime.now(), + createdAt: baseTime.add(Duration(seconds: index)), shadowed: math.Random().nextBool(), replyCount: index, updatedAt: DateTime.now(), @@ -59,7 +65,7 @@ void main() { type: 'testType', user: users[index], channelRole: 'channel_member', - createdAt: DateTime.now(), + createdAt: baseTime.add(Duration(seconds: index)), shadowed: math.Random().nextBool(), replyCount: index, updatedAt: DateTime.now(), @@ -85,7 +91,7 @@ void main() { channelRole: 'channel_member', parentId: mapAllThreadToFirstMessage ? messages[0].id : messages[index].id, - createdAt: DateTime.now(), + createdAt: baseTime.add(Duration(seconds: index)), shadowed: math.Random().nextBool(), replyCount: index, updatedAt: DateTime.now(), @@ -294,7 +300,7 @@ void main() { const options = PaginationParams( limit: 15, lessThan: 'testThreadMessageId${cid}25', - greaterThanOrEqual: 'testThreadMessageId${cid}5', + greaterThan: 'testThreadMessageId${cid}5', ); // Messages should be empty initially @@ -320,6 +326,8 @@ void main() { ); expect(threadMessages.length, 15); expect(threadMessages.first.parentId, parentId); + expect(threadMessages.first.id, 'testThreadMessageId${cid}5'); + expect(threadMessages.last.id, 'testThreadMessageId${cid}19'); }); test('getMessagesByCid', () async { @@ -365,11 +373,11 @@ void main() { const cid = 'test:Cid'; const limit = 15; const lessThan = 'testMessageId${cid}25'; - const greaterThanOrEqual = 'testMessageId${cid}5'; + const greaterThan = 'testMessageId${cid}5'; const pagination = PaginationParams( limit: limit, lessThan: lessThan, - greaterThanOrEqual: greaterThanOrEqual, + greaterThan: greaterThan, ); // Should be empty initially @@ -389,8 +397,133 @@ void main() { messagePagination: pagination, ); expect(fetchedMessages.length, limit); + expect(fetchedMessages.first.id, 'testMessageId${cid}10'); expect(fetchedMessages.last.id, 'testMessageId${cid}24'); - expect(fetchedMessages.first.id != lessThan, true); + }); + + group('getMessagesByCid pagination', () { + const cid = 'test:Cid'; + + test('lessThan only trims messages from the end', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 25); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}24'); + }); + + test('greaterThan only trims messages from the start', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThan: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 25); + expect(fetchedMessages.first.id, 'testMessageId${cid}5'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('limit only keeps the last N messages', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams(limit: 15), + ); + + expect(fetchedMessages.length, 15); + expect(fetchedMessages.first.id, 'testMessageId${cid}15'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('lessThan id not in result set is a no-op', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'missing-id', + ), + ); + + expect(fetchedMessages.length, 30); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('greaterThan id not in result set is a no-op', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThan: 'missing-id', + ), + ); + + expect(fetchedMessages.length, 30); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('default PaginationParams() applies implicit limit of 10', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams(), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}20'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('default limit + lessThan returns last 10 of filtered set', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + lessThan: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}15'); + expect(fetchedMessages.last.id, 'testMessageId${cid}24'); + }); + + test('default limit + greaterThan returns last 10 of filtered set', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + greaterThan: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}20'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); }); test('updateMessages', () async { From e1ec8592f179501010b7e755b987a5a0fd73f314 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Thu, 21 May 2026 13:09:26 +0200 Subject: [PATCH 04/12] refactor(dao): fix formatting --- .../lib/src/dao/pinned_message_reaction_dao.dart | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart b/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart index 4ec053b436..6f5fdc6e68 100644 --- a/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart @@ -27,9 +27,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor /// [Reactions.userId] with [userId] Future> getReactionsByUserId( String messageId, - String userId,) { - final where = pinnedMessageReactions.messageId.equals( - messageId) & pinnedMessageReactions.userId.equals(userId); + String userId, + ) { + final where = pinnedMessageReactions.messageId.equals(messageId) & + pinnedMessageReactions.userId.equals(userId); return _selectReactions(where); } From 468960622a1d24f0c0619cd3453a2687d1e9537c Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Thu, 21 May 2026 15:45:54 +0200 Subject: [PATCH 05/12] refactor(dao): Update CHANGELOG.md --- packages/stream_chat_persistence/CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/packages/stream_chat_persistence/CHANGELOG.md b/packages/stream_chat_persistence/CHANGELOG.md index d39ce5ba4c..147de6e493 100644 --- a/packages/stream_chat_persistence/CHANGELOG.md +++ b/packages/stream_chat_persistence/CHANGELOG.md @@ -1,3 +1,12 @@ +## Upcoming Changes + +🚀 Performance + +- Read only the messages matching the `PaginationParams` from DB when calling `MessageDao.getMessagesByCid` instead of reading all messages for the channel and applying pagination in memory. +- Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. +- Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. + + ## 9.24.0 - Updated `stream_chat` dependency to [`9.24.0`](https://pub.dev/packages/stream_chat/changelog). From edb86f7f66d734c2240d23d1bc0f1ab764c0da48 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Thu, 21 May 2026 16:12:20 +0200 Subject: [PATCH 06/12] refactor(dao): Apply formatting --- packages/stream_chat_persistence/lib/src/dao/message_dao.dart | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index fdd84526fb..2aed79cda5 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -239,8 +239,8 @@ class MessageDao extends DatabaseAccessor /// timestamps so the filter runs in SQL instead of after the fact in Dart. Future _lookupMessageCreatedAt(String id) { return (selectOnly(messages) - ..addColumns([messages.createdAt]) - ..where(messages.id.equals(id))) + ..addColumns([messages.createdAt]) + ..where(messages.id.equals(id))) .map((row) => row.read(messages.createdAt)) .getSingleOrNull(); } From b580ccdc473fc06e537b622915aa7600e4ece87d Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Mon, 25 May 2026 11:33:46 +0200 Subject: [PATCH 07/12] refactor(dao): enhance message pagination logic to support inclusive cursors --- packages/stream_chat_persistence/CHANGELOG.md | 9 ++ .../lib/src/dao/message_dao.dart | 81 ++++++++++++---- .../test/src/dao/message_dao_test.dart | 96 ++++++++++++++++++- 3 files changed, 162 insertions(+), 24 deletions(-) diff --git a/packages/stream_chat_persistence/CHANGELOG.md b/packages/stream_chat_persistence/CHANGELOG.md index 147de6e493..74113f71f4 100644 --- a/packages/stream_chat_persistence/CHANGELOG.md +++ b/packages/stream_chat_persistence/CHANGELOG.md @@ -6,6 +6,15 @@ - Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. - Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. +✅ Added + +- `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`. + +🐞 Fixed + +- `MessageDao.getMessagesByCid` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour. +- `MessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail — mirroring how `lessThan` already returned the messages immediately before the pivot. + ## 9.24.0 diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index 2aed79cda5..f1396c786c 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -169,14 +169,49 @@ class MessageDao extends DatabaseAccessor bool fetchDraft = true, PaginationParams? messagePagination, }) async { - final lessThanCutoff = await switch (messagePagination?.lessThan) { - final id? => _lookupMessageCreatedAt(id), - _ => null, - }; - final greaterThanCutoff = await switch (messagePagination?.greaterThan) { - final id? => _lookupMessageCreatedAt(id), - _ => null, - }; + final ( + lessThanCutoff, + lessThanOrEqualCutoff, + greaterThanCutoff, + greaterThanOrEqualCutoff, + ) = await ( + switch (messagePagination?.lessThan) { + final id? => _lookupMessageCreatedAt(id), + _ => Future.value(), + }, + switch (messagePagination?.lessThanOrEqual) { + final id? => _lookupMessageCreatedAt(id), + _ => Future.value(), + }, + switch (messagePagination?.greaterThan) { + final id? => _lookupMessageCreatedAt(id), + _ => Future.value(), + }, + switch (messagePagination?.greaterThanOrEqual) { + final id? => _lookupMessageCreatedAt(id), + _ => Future.value(), + }, + ).wait; + + // When the caller is paginating forward (greaterThan / greaterThanOrEqual + // only), order ASC so the SQL `LIMIT` retains the N messages immediately + // AFTER the cursor. Otherwise order DESC so `LIMIT` retains the N most + // recent (closest to a `lessThan` cursor, or the channel tail when no + // cursor is set). The final result is always reshaped to ASC for display. + final isForwardPagination = + (greaterThanCutoff != null || greaterThanOrEqualCutoff != null) && + lessThanCutoff == null && + lessThanOrEqualCutoff == null; + + final orderBy = isForwardPagination + ? [ + OrderingTerm.asc(messages.createdAt), + OrderingTerm.asc(messages.id), + ] + : [ + OrderingTerm.desc(messages.createdAt), + OrderingTerm.desc(messages.id), + ]; final query = select(messages).join([ leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), @@ -187,15 +222,18 @@ class MessageDao extends DatabaseAccessor ]) ..where(messages.channelCid.equals(cid)) ..where(messages.parentId.isNull() | messages.showInChannel.equals(true)) - ..orderBy([ - OrderingTerm.desc(messages.createdAt), - OrderingTerm.desc(messages.id), - ]); + ..orderBy(orderBy); if (lessThanCutoff case final t?) { query.where(messages.createdAt.isSmallerThanValue(t)); } + if (lessThanOrEqualCutoff case final t?) { + query.where(messages.createdAt.isSmallerOrEqualValue(t)); + } if (greaterThanCutoff case final t?) { + query.where(messages.createdAt.isBiggerThanValue(t)); + } + if (greaterThanOrEqualCutoff case final t?) { query.where(messages.createdAt.isBiggerOrEqualValue(t)); } @@ -203,11 +241,12 @@ class MessageDao extends DatabaseAccessor query.limit(messagePagination.limit); } - final rows = (await query.get()).reversed.toList(); - if (rows.isEmpty) return []; + final rows = await query.get(); + final orderedRows = isForwardPagination ? rows : rows.reversed.toList(); return Future.wait( - rows.map((row) => _messageFromJoinRow(row, fetchDraft: fetchDraft)), + orderedRows + .map((row) => _messageFromJoinRow(row, fetchDraft: fetchDraft)), ); } @@ -234,13 +273,17 @@ class MessageDao extends DatabaseAccessor } /// Returns the `createdAt` of the message with [id] in the local cache, - /// or `null` if the message isn't cached. Used by [getMessagesByCid] to - /// resolve `lessThan` / `greaterThan` pagination cursors into SQL-comparable - /// timestamps so the filter runs in SQL instead of after the fact in Dart. + /// or `null` if the message isn't cached or isn't visible in the channel + /// (i.e. a thread reply with `showInChannel = false`). The visibility + /// predicate mirrors the main [getMessagesByCid] query so a hidden cursor + /// behaves as a no-op, matching the pre-SQL-filter behaviour. Future _lookupMessageCreatedAt(String id) { return (selectOnly(messages) ..addColumns([messages.createdAt]) - ..where(messages.id.equals(id))) + ..where(messages.id.equals(id)) + ..where( + messages.parentId.isNull() | messages.showInChannel.equals(true), + )) .map((row) => row.read(messages.createdAt)) .getSingleOrNull(); } diff --git a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart index c4f2af1518..7b8d69a05c 100644 --- a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart @@ -420,7 +420,8 @@ void main() { expect(fetchedMessages.last.id, 'testMessageId${cid}24'); }); - test('greaterThan only trims messages from the start', () async { + test('greaterThan only trims messages from the start (exclusive)', + () async { await _prepareTestData(cid, count: 30); final fetchedMessages = await messageDao.getMessagesByCid( @@ -431,8 +432,8 @@ void main() { ), ); - expect(fetchedMessages.length, 25); - expect(fetchedMessages.first.id, 'testMessageId${cid}5'); + expect(fetchedMessages.length, 24); + expect(fetchedMessages.first.id, 'testMessageId${cid}6'); expect(fetchedMessages.last.id, 'testMessageId${cid}29'); }); @@ -481,6 +482,27 @@ void main() { expect(fetchedMessages.last.id, 'testMessageId${cid}29'); }); + test('thread-reply id as cursor is a no-op (not visible in channel)', + () async { + // `_prepareTestData` inserts thread replies with `parentId` set and + // `showInChannel` left null — i.e. not visible in the channel query. + // Passing such an id as a cursor must resolve to a no-op so the main + // query falls back to returning the full channel slice. + await _prepareTestData(cid, count: 30, threads: true); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'testThreadMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 30); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + test('default PaginationParams() applies implicit limit of 10', () async { await _prepareTestData(cid, count: 30); @@ -509,7 +531,7 @@ void main() { expect(fetchedMessages.last.id, 'testMessageId${cid}24'); }); - test('default limit + greaterThan returns last 10 of filtered set', + test('default limit + greaterThan returns first 10 after the pivot', () async { await _prepareTestData(cid, count: 30); @@ -521,9 +543,73 @@ void main() { ); expect(fetchedMessages.length, 10); - expect(fetchedMessages.first.id, 'testMessageId${cid}20'); + expect(fetchedMessages.first.id, 'testMessageId${cid}6'); + expect(fetchedMessages.last.id, 'testMessageId${cid}15'); + }); + + test('lessThanOrEqual is inclusive of the pivot', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThanOrEqual: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 26); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}25'); + }); + + test('greaterThanOrEqual is inclusive of the pivot', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThanOrEqual: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 25); + expect(fetchedMessages.first.id, 'testMessageId${cid}5'); expect(fetchedMessages.last.id, 'testMessageId${cid}29'); }); + + test('default limit + lessThanOrEqual returns the pivot and 9 before', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + lessThanOrEqual: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}16'); + expect(fetchedMessages.last.id, 'testMessageId${cid}25'); + }); + + test('default limit + greaterThanOrEqual returns the pivot and 9 after', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + greaterThanOrEqual: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}5'); + expect(fetchedMessages.last.id, 'testMessageId${cid}14'); + }); }); test('updateMessages', () async { From 52f0c265685f52b0d75982eccd183ef7d80e9a92 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Mon, 25 May 2026 11:35:05 +0200 Subject: [PATCH 08/12] refactor(dao): Update docs --- packages/stream_chat_persistence/lib/src/dao/message_dao.dart | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index f1396c786c..6d53bdf171 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -274,9 +274,7 @@ class MessageDao extends DatabaseAccessor /// Returns the `createdAt` of the message with [id] in the local cache, /// or `null` if the message isn't cached or isn't visible in the channel - /// (i.e. a thread reply with `showInChannel = false`). The visibility - /// predicate mirrors the main [getMessagesByCid] query so a hidden cursor - /// behaves as a no-op, matching the pre-SQL-filter behaviour. + /// (i.e. a thread reply with `showInChannel = false`). Future _lookupMessageCreatedAt(String id) { return (selectOnly(messages) ..addColumns([messages.createdAt]) From 2e719f4c1eae81b2e66be248df64f480c501f522 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Mon, 25 May 2026 23:36:49 +0200 Subject: [PATCH 09/12] refactor(dao): Add message.id tiebreakier --- .../lib/src/dao/message_dao.dart | 84 +++++++++++-------- .../test/src/dao/message_dao_test.dart | 76 +++++++++++++++++ 2 files changed, 124 insertions(+), 36 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index 6d53bdf171..e640173ae9 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -170,27 +170,15 @@ class MessageDao extends DatabaseAccessor PaginationParams? messagePagination, }) async { final ( - lessThanCutoff, - lessThanOrEqualCutoff, - greaterThanCutoff, - greaterThanOrEqualCutoff, + lessThanCursor, + lessThanOrEqualCursor, + greaterThanCursor, + greaterThanOrEqualCursor, ) = await ( - switch (messagePagination?.lessThan) { - final id? => _lookupMessageCreatedAt(id), - _ => Future.value(), - }, - switch (messagePagination?.lessThanOrEqual) { - final id? => _lookupMessageCreatedAt(id), - _ => Future.value(), - }, - switch (messagePagination?.greaterThan) { - final id? => _lookupMessageCreatedAt(id), - _ => Future.value(), - }, - switch (messagePagination?.greaterThanOrEqual) { - final id? => _lookupMessageCreatedAt(id), - _ => Future.value(), - }, + _lookupCursor(messagePagination?.lessThan), + _lookupCursor(messagePagination?.lessThanOrEqual), + _lookupCursor(messagePagination?.greaterThan), + _lookupCursor(messagePagination?.greaterThanOrEqual), ).wait; // When the caller is paginating forward (greaterThan / greaterThanOrEqual @@ -199,9 +187,9 @@ class MessageDao extends DatabaseAccessor // recent (closest to a `lessThan` cursor, or the channel tail when no // cursor is set). The final result is always reshaped to ASC for display. final isForwardPagination = - (greaterThanCutoff != null || greaterThanOrEqualCutoff != null) && - lessThanCutoff == null && - lessThanOrEqualCutoff == null; + (greaterThanCursor != null || greaterThanOrEqualCursor != null) && + lessThanCursor == null && + lessThanOrEqualCursor == null; final orderBy = isForwardPagination ? [ @@ -224,17 +212,37 @@ class MessageDao extends DatabaseAccessor ..where(messages.parentId.isNull() | messages.showInChannel.equals(true)) ..orderBy(orderBy); - if (lessThanCutoff case final t?) { - query.where(messages.createdAt.isSmallerThanValue(t)); + // Cursor predicates compare the full `(createdAt, id)` tuple — the same + // key used in ORDER BY — so messages sharing a `createdAt` with the cursor + // fall on the correct side of the boundary. Filtering on `createdAt` alone + // would skip or repeat those siblings across pages. + if (lessThanCursor case final c?) { + query.where( + messages.createdAt.isSmallerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isSmallerThanValue(c.id)), + ); } - if (lessThanOrEqualCutoff case final t?) { - query.where(messages.createdAt.isSmallerOrEqualValue(t)); + if (lessThanOrEqualCursor case final c?) { + query.where( + messages.createdAt.isSmallerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isSmallerOrEqualValue(c.id)), + ); } - if (greaterThanCutoff case final t?) { - query.where(messages.createdAt.isBiggerThanValue(t)); + if (greaterThanCursor case final c?) { + query.where( + messages.createdAt.isBiggerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isBiggerThanValue(c.id)), + ); } - if (greaterThanOrEqualCutoff case final t?) { - query.where(messages.createdAt.isBiggerOrEqualValue(t)); + if (greaterThanOrEqualCursor case final c?) { + query.where( + messages.createdAt.isBiggerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isBiggerOrEqualValue(c.id)), + ); } if (messagePagination != null) { @@ -272,11 +280,13 @@ class MessageDao extends DatabaseAccessor ); } - /// Returns the `createdAt` of the message with [id] in the local cache, - /// or `null` if the message isn't cached or isn't visible in the channel - /// (i.e. a thread reply with `showInChannel = false`). - Future _lookupMessageCreatedAt(String id) { - return (selectOnly(messages) + /// Returns the `(createdAt, id)` cursor for the message with [id] in the + /// local cache, or `null` if [id] is null, the message isn't cached, or + /// isn't visible in the channel (i.e. a thread reply with + /// `showInChannel = false`). + Future<({DateTime createdAt, String id})?> _lookupCursor(String? id) async { + if (id == null) return null; + final createdAt = await (selectOnly(messages) ..addColumns([messages.createdAt]) ..where(messages.id.equals(id)) ..where( @@ -284,5 +294,7 @@ class MessageDao extends DatabaseAccessor )) .map((row) => row.read(messages.createdAt)) .getSingleOrNull(); + if (createdAt == null) return null; + return (createdAt: createdAt, id: id); } } diff --git a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart index 7b8d69a05c..93a9d06dd8 100644 --- a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart @@ -610,6 +610,82 @@ void main() { expect(fetchedMessages.first.id, 'testMessageId${cid}5'); expect(fetchedMessages.last.id, 'testMessageId${cid}14'); }); + + test('cursor with tied createdAt does not skip or duplicate siblings', + () async { + // Three messages share an identical `createdAt`. The SQL ORDER BY uses + // the `(createdAt, id)` tuple, so within the trio the relative order is + // by id (lexicographic). A cursor at `msg_tieB` must split the trio + // cleanly: `msg_tieA` lands on the "before" side, `msg_tieC` on the + // "after" side. A `createdAt`-only WHERE predicate would collapse all + // three into the cursor's bucket and drop or keep them together. + final users = [User(id: 'tieUser')]; + await database.userDao.updateUsers(users); + await database.channelDao.updateChannels([ChannelModel(cid: cid)]); + + final tie = DateTime.now(); + final earlier = tie.subtract(const Duration(seconds: 1)); + final later = tie.add(const Duration(seconds: 1)); + + Message m(String id, DateTime t) => Message( + id: id, + type: 'regular', + user: users.first, + createdAt: t, + updatedAt: t, + text: id, + ); + + await messageDao.updateMessages(cid, [ + m('msg_pre', earlier), + m('msg_tieA', tie), + m('msg_tieB', tie), + m('msg_tieC', tie), + m('msg_post', later), + ]); + + final before = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'msg_tieB', + ), + ); + expect(before.map((m) => m.id).toList(), ['msg_pre', 'msg_tieA']); + + final after = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThan: 'msg_tieB', + ), + ); + expect(after.map((m) => m.id).toList(), ['msg_tieC', 'msg_post']); + + final atOrBefore = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThanOrEqual: 'msg_tieB', + ), + ); + expect( + atOrBefore.map((m) => m.id).toList(), + ['msg_pre', 'msg_tieA', 'msg_tieB'], + ); + + final atOrAfter = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThanOrEqual: 'msg_tieB', + ), + ); + expect( + atOrAfter.map((m) => m.id).toList(), + ['msg_tieB', 'msg_tieC', 'msg_post'], + ); + }); }); test('updateMessages', () async { From 35569a0739ab7e817b7e7ff28e5c4a35b0c66549 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Mon, 25 May 2026 23:38:33 +0200 Subject: [PATCH 10/12] refactor(dao): Add message.id tiebreaker --- .../stream_chat_persistence/test/src/dao/message_dao_test.dart | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart index 93a9d06dd8..7fe81432b4 100644 --- a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart @@ -629,7 +629,6 @@ void main() { Message m(String id, DateTime t) => Message( id: id, - type: 'regular', user: users.first, createdAt: t, updatedAt: t, From fa6898e81c420d187f0589108ff87f0c77a2d09d Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Mon, 25 May 2026 23:42:51 +0200 Subject: [PATCH 11/12] refactor(dao): Update CHANGELOG.md --- packages/stream_chat_persistence/CHANGELOG.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/stream_chat_persistence/CHANGELOG.md b/packages/stream_chat_persistence/CHANGELOG.md index a4388ca94d..8ff1b9bb00 100644 --- a/packages/stream_chat_persistence/CHANGELOG.md +++ b/packages/stream_chat_persistence/CHANGELOG.md @@ -7,12 +7,9 @@ - Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. - Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. -✅ Added - -- `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`. - 🐞 Fixed +- `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`. - `MessageDao.getMessagesByCid` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour. - `MessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail — mirroring how `lessThan` already returned the messages immediately before the pivot. From 69e1cc9b12f1d7beb43a6a9bc2924e1610c65e56 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Tue, 26 May 2026 14:14:41 +0200 Subject: [PATCH 12/12] refactor(dao): Move thread reply pagination to SQL --- packages/stream_chat_persistence/CHANGELOG.md | 4 + .../lib/src/dao/message_dao.dart | 123 ++++++-- .../test/src/dao/message_dao_test.dart | 270 +++++++++++++++--- 3 files changed, 324 insertions(+), 73 deletions(-) diff --git a/packages/stream_chat_persistence/CHANGELOG.md b/packages/stream_chat_persistence/CHANGELOG.md index 8ff1b9bb00..ea8d983080 100644 --- a/packages/stream_chat_persistence/CHANGELOG.md +++ b/packages/stream_chat_persistence/CHANGELOG.md @@ -4,6 +4,7 @@ - Reduce the number of DB reads in the `ChatPersistenceClient.getChannelStates` method. - Read only the messages matching the `PaginationParams` from DB when calling `MessageDao.getMessagesByCid` instead of reading all messages for the channel and applying pagination in memory. +- Read only the thread replies matching the `PaginationParams` from DB when calling `MessageDao.getThreadMessagesByParentId` instead of reading all replies for the thread and applying pagination in memory. - Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. - Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. @@ -12,6 +13,9 @@ - `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`. - `MessageDao.getMessagesByCid` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour. - `MessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail — mirroring how `lessThan` already returned the messages immediately before the pivot. +- `MessageDao.getThreadMessagesByParentId` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor reply), in addition to the existing `lessThan`/`greaterThan`. +- `MessageDao.getThreadMessagesByParentId` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour. +- `MessageDao.getThreadMessagesByParentId` with a `limit` now returns the page of replies closest to the pivot (immediately before a `lessThan`/`lessThanOrEqual` cursor, immediately after a `greaterThan`/`greaterThanOrEqual` cursor, or the thread tail when no cursor is set), instead of always returning the oldest replies of the thread. ## 9.24.0 diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index e640173ae9..f295115e73 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -124,42 +124,89 @@ class MessageDao extends DatabaseAccessor String parentId, { PaginationParams? options, }) async { - final msgList = await Future.wait(await (select(messages).join([ + final ( + lessThanCursor, + lessThanOrEqualCursor, + greaterThanCursor, + greaterThanOrEqualCursor, + ) = await ( + _lookupThreadCursor(parentId, options?.lessThan), + _lookupThreadCursor(parentId, options?.lessThanOrEqual), + _lookupThreadCursor(parentId, options?.greaterThan), + _lookupThreadCursor(parentId, options?.greaterThanOrEqual), + ).wait; + + // When the caller is paginating forward (greaterThan / greaterThanOrEqual + // only), order ASC so the SQL `LIMIT` retains the N replies immediately + // AFTER the cursor. Otherwise order DESC so `LIMIT` retains the N replies + // closest to a `lessThan` cursor (or the thread's tail when no cursor is + // set). The final result is always reshaped to ASC for display. + final isForwardPagination = + (greaterThanCursor != null || greaterThanOrEqualCursor != null) && + lessThanCursor == null && + lessThanOrEqualCursor == null; + + final orderBy = isForwardPagination + ? [ + OrderingTerm.asc(messages.createdAt), + OrderingTerm.asc(messages.id), + ] + : [ + OrderingTerm.desc(messages.createdAt), + OrderingTerm.desc(messages.id), + ]; + + final query = select(messages).join([ leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), leftOuterJoin( _pinnedByUsers, messages.pinnedByUserId.equalsExp(_pinnedByUsers.id), ), ]) - ..where(messages.parentId.isNotNull()) - ..where(messages.parentId.equals(parentId)) - ..orderBy([OrderingTerm.asc(messages.createdAt)])) - .map(_messageFromJoinRow) - .get()); - - if (msgList.isNotEmpty) { - if (options?.lessThan != null) { - final lessThanIndex = msgList.indexWhere( - (m) => m.id == options!.lessThan, - ); - if (lessThanIndex != -1) { - msgList.removeRange(lessThanIndex, msgList.length); - } - } - if (options?.greaterThan != null) { - final greaterThanIndex = msgList.indexWhere( - (m) => m.id == options!.greaterThan, - ); - if (greaterThanIndex != -1) { - msgList.removeRange(0, greaterThanIndex); - } - } - final limit = options?.limit; - if (limit != null && limit > 0) { - return msgList.take(limit).toList(); - } + ..where(messages.parentId.equals(parentId)) + ..orderBy(orderBy); + + // Cursor predicates compare the full `(createdAt, id)` tuple — the same + // key used in ORDER BY — so replies sharing a `createdAt` with the cursor + // fall on the correct side of the boundary. Filtering on `createdAt` + // alone would skip or repeat those siblings across pages. + if (lessThanCursor case final c?) { + query.where( + messages.createdAt.isSmallerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isSmallerThanValue(c.id)), + ); } - return msgList; + if (lessThanOrEqualCursor case final c?) { + query.where( + messages.createdAt.isSmallerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isSmallerOrEqualValue(c.id)), + ); + } + if (greaterThanCursor case final c?) { + query.where( + messages.createdAt.isBiggerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isBiggerThanValue(c.id)), + ); + } + if (greaterThanOrEqualCursor case final c?) { + query.where( + messages.createdAt.isBiggerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isBiggerOrEqualValue(c.id)), + ); + } + + if (options != null) { + query.limit(options.limit); + } + + final rows = await query.get(); + final orderedRows = isForwardPagination ? rows : rows.reversed.toList(); + + return Future.wait(orderedRows.map(_messageFromJoinRow)); } /// Returns all the messages of a channel by matching @@ -297,4 +344,22 @@ class MessageDao extends DatabaseAccessor if (createdAt == null) return null; return (createdAt: createdAt, id: id); } + + /// Returns the `(createdAt, id)` cursor for the thread reply with [id] + /// under [parentId] in the local cache, or `null` if [id] is null or no + /// such reply is cached. + Future<({DateTime createdAt, String id})?> _lookupThreadCursor( + String parentId, + String? id, + ) async { + if (id == null) return null; + final createdAt = await (selectOnly(messages) + ..addColumns([messages.createdAt]) + ..where(messages.id.equals(id)) + ..where(messages.parentId.equals(parentId))) + .map((row) => row.read(messages.createdAt)) + .getSingleOrNull(); + if (createdAt == null) return null; + return (createdAt: createdAt, id: id); + } } diff --git a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart index 7fe81432b4..65b98d827d 100644 --- a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart @@ -275,59 +275,241 @@ void main() { } }); - test('getThreadMessagesByParentId', () async { + group('getThreadMessagesByParentId', () { const cid = 'test:Cid'; const parentId = 'testMessageId${cid}0'; + String threadId(int i) => 'testThreadMessageId$cid$i'; + + test('getThreadMessagesByParentId', () async { + // Messages should be empty initially + final messages = await messageDao.getThreadMessagesByParentId(parentId); + expect(messages, isEmpty); + + // Preparing test data + final insertedMessages = await _prepareTestData(cid, threads: true); + expect(insertedMessages, isNotEmpty); + + // Should fetch all the thread messages of parentId + final threadMessages = + await messageDao.getThreadMessagesByParentId(parentId); + expect(threadMessages.length, 1); + expect(threadMessages.first.parentId, parentId); + }); - // Messages should be empty initially - final messages = await messageDao.getThreadMessagesByParentId(parentId); - expect(messages, isEmpty); + test('getThreadMessagesByParentId along with pagination', () async { + const options = PaginationParams( + limit: 15, + lessThan: 'testThreadMessageId${cid}25', + greaterThan: 'testThreadMessageId${cid}5', + ); - // Preparing test data - final insertedMessages = await _prepareTestData(cid, threads: true); - expect(insertedMessages, isNotEmpty); + // Messages should be empty initially + final messages = await messageDao.getThreadMessagesByParentId( + parentId, + options: options, + ); + expect(messages, isEmpty); - // Should fetch all the thread messages of parentId - final threadMessages = - await messageDao.getThreadMessagesByParentId(parentId); - expect(threadMessages.length, 1); - expect(threadMessages.first.parentId, parentId); - }); + // Preparing test data + final insertedMessages = await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + expect(insertedMessages, isNotEmpty); - test('getThreadMessagesByParentId along with pagination', () async { - const cid = 'test:Cid'; - const parentId = 'testMessageId${cid}0'; - const options = PaginationParams( - limit: 15, - lessThan: 'testThreadMessageId${cid}25', - greaterThan: 'testThreadMessageId${cid}5', - ); + // Should fetch all the thread messages of parentId and apply the + // pagination. + final threadMessages = await messageDao.getThreadMessagesByParentId( + parentId, + options: options, + ); + expect(threadMessages.length, 15); + expect(threadMessages.first.parentId, parentId); + // lessThan is set → backward pagination → DESC + LIMIT then reverse. + // Filter: id6..id24. Take 15 closest to id25 → id10..id24. + expect(threadMessages.first.id, 'testThreadMessageId${cid}10'); + expect(threadMessages.last.id, 'testThreadMessageId${cid}24'); + }); - // Messages should be empty initially - final messages = await messageDao.getThreadMessagesByParentId( - parentId, - options: options, - ); - expect(messages, isEmpty); + test('limit only returns the latest N replies', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); - // Preparing test data - final insertedMessages = await _prepareTestData( - cid, - threads: true, - mapAllThreadToFirstMessage: true, - count: 30, - ); - expect(insertedMessages, isNotEmpty); + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams(limit: 5), + ); - // Should fetch all the thread messages of parentId and apply the pagination - final threadMessages = await messageDao.getThreadMessagesByParentId( - parentId, - options: options, - ); - expect(threadMessages.length, 15); - expect(threadMessages.first.parentId, parentId); - expect(threadMessages.first.id, 'testThreadMessageId${cid}5'); - expect(threadMessages.last.id, 'testThreadMessageId${cid}19'); + // No cursor → backward pagination → DESC + LIMIT then reversed. The + // result is the newest 5 replies (the tail of the thread), in ASC + // order: id25..id29. + expect(replies.length, 5); + expect(replies.first.id, threadId(25)); + expect(replies.last.id, threadId(29)); + }); + + test('lessThan only excludes the cursor', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: PaginationParams(limit: 5, lessThan: threadId(25)), + ); + + // Strictly before id25 → id0..id24. Backward pagination keeps the 5 + // closest to the cursor → id20..id24. + expect(replies.length, 5); + expect(replies.first.id, threadId(20)); + expect(replies.last.id, threadId(24)); + }); + + test('lessThanOrEqual only includes the cursor', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: PaginationParams(limit: 5, lessThanOrEqual: threadId(25)), + ); + + // Up to and including id25 → id0..id25. Backward pagination keeps the + // 5 closest to the cursor → id21..id25. + expect(replies.length, 5); + expect(replies.first.id, threadId(21)); + expect(replies.last.id, threadId(25)); + }); + + test('greaterThan only excludes the cursor', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: PaginationParams(limit: 5, greaterThan: threadId(5)), + ); + + // Strictly after id5 → id6..id29, capped to 5 → id6..id10. + expect(replies.length, 5); + expect(replies.first.id, threadId(6)); + expect(replies.last.id, threadId(10)); + }); + + test('greaterThanOrEqual only includes the cursor', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: PaginationParams(limit: 5, greaterThanOrEqual: threadId(5)), + ); + + // From id5 onwards → id5..id29, capped to 5 → id5..id9. + expect(replies.length, 5); + expect(replies.first.id, threadId(5)); + expect(replies.last.id, threadId(9)); + }); + + test('cursor with tied createdAt does not skip or duplicate siblings', + () async { + // Three replies share an identical `createdAt`. The SQL ORDER BY uses + // the `(createdAt, id)` tuple, so within the trio the relative order is + // by id (lexicographic). A cursor at `reply_tieB` must split the trio + // cleanly: `reply_tieA` lands on the "before" side, `reply_tieC` on the + // "after" side. A `createdAt`-only WHERE predicate would collapse all + // three into the cursor's bucket and drop or keep them together. + final users = [User(id: 'tieUser')]; + await database.userDao.updateUsers(users); + await database.channelDao.updateChannels([ChannelModel(cid: cid)]); + + final tie = DateTime.now(); + final earlier = tie.subtract(const Duration(seconds: 1)); + final later = tie.add(const Duration(seconds: 1)); + + Message parent() => Message( + id: parentId, + user: users.first, + createdAt: earlier, + updatedAt: earlier, + text: parentId, + ); + + Message reply(String id, DateTime t) => Message( + id: id, + user: users.first, + parentId: parentId, + createdAt: t, + updatedAt: t, + text: id, + ); + + await messageDao.updateMessages(cid, [ + parent(), + reply('reply_pre', earlier), + reply('reply_tieA', tie), + reply('reply_tieB', tie), + reply('reply_tieC', tie), + reply('reply_post', later), + ]); + + final before = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams(limit: 100, lessThan: 'reply_tieB'), + ); + expect(before.map((m) => m.id).toList(), ['reply_pre', 'reply_tieA']); + + final after = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams(limit: 100, greaterThan: 'reply_tieB'), + ); + expect(after.map((m) => m.id).toList(), ['reply_tieC', 'reply_post']); + + final atOrBefore = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams( + limit: 100, + lessThanOrEqual: 'reply_tieB', + ), + ); + expect( + atOrBefore.map((m) => m.id).toList(), + ['reply_pre', 'reply_tieA', 'reply_tieB'], + ); + + final atOrAfter = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams( + limit: 100, + greaterThanOrEqual: 'reply_tieB', + ), + ); + expect( + atOrAfter.map((m) => m.id).toList(), + ['reply_tieB', 'reply_tieC', 'reply_post'], + ); + }); }); test('getMessagesByCid', () async {