From 88987e91b5cf49a42d8fc7dbcc2703b1471c87b6 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Tue, 19 May 2026 13:08:49 +0200 Subject: [PATCH 01/17] refactor(dao): Filter reactions by userId at DB level. --- .../lib/src/dao/reaction_dao.dart | 34 +++++++++++-------- .../test/src/dao/reaction_dao_test.dart | 13 +++++-- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart b/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart index d6dae9bd99..9d726bc8e3 100644 --- a/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart @@ -16,17 +16,10 @@ class ReactionDao extends DatabaseAccessor /// Returns all the reactions of a particular message by matching /// [Reactions.messageId] with [messageId] - Future> getReactions(String messageId) => - (select(reactions).join([ - leftOuterJoin(users, reactions.userId.equalsExp(users.id)), - ]) - ..where(reactions.messageId.equals(messageId)) - ..orderBy([OrderingTerm.asc(reactions.createdAt)])) - .map((rows) { - final userEntity = rows.readTableOrNull(users); - final reactionEntity = rows.readTable(reactions); - return reactionEntity.toReaction(user: userEntity?.toUser()); - }).get(); + Future> getReactions(String messageId) { + final where = reactions.messageId.equals(messageId); + return _selectReactions(where); + } /// Returns all the reactions of a particular message /// added by a particular user by matching @@ -35,9 +28,10 @@ class ReactionDao extends DatabaseAccessor Future> getReactionsByUserId( String messageId, String userId, - ) async { - final reactions = await getReactions(messageId); - return reactions.where((it) => it.userId == userId).toList(); + ) { + final where = + reactions.messageId.equals(messageId) & reactions.userId.equals(userId); + return _selectReactions(where); } /// Updates the reactions data with the new [reactionList] data @@ -57,4 +51,16 @@ class ReactionDao extends DatabaseAccessor (r) => r.messageId.isIn(messageIds), ); }); + + Future> _selectReactions(Expression where) { + final rows = select(reactions) + .join([leftOuterJoin(users, reactions.userId.equalsExp(users.id))]) + ..where(where) + ..orderBy([OrderingTerm.asc(reactions.createdAt)]); + return rows.map((row) { + final reactionEntity = row.readTable(reactions); + final userEntity = row.readTableOrNull(users); + return reactionEntity.toReaction(user: userEntity?.toUser()); + }).get(); + } } diff --git a/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart index 7fa3569e4e..38c0ff7d58 100644 --- a/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart @@ -80,19 +80,26 @@ void main() { test('getReactionsByUserId', () async { const messageId = 'testMessageId'; const userId = 'testUserId'; + const otherUserId = 'otherUserid'; // Should be empty initially final reactions = await reactionDao.getReactionsByUserId(messageId, userId); expect(reactions, isEmpty); - // Adding sample reactions + // Adding sample reactions from the target user. final insertedReactions = await _prepareReactionData(messageId, userId: userId); expect(insertedReactions, isNotEmpty); - // Fetched reaction length should match inserted reactions length. + // Adding sample reactions from other users on the same message. + final otherInsertedReactions = + await _prepareReactionData(messageId, userId: otherUserId); + expect(otherInsertedReactions, isNotEmpty); + + // Fetched reaction length should match the target user's reactions only. // Every reaction messageId should match the provided messageId. - // Every reaction userId should match the provided userId. + // Every reaction userId should match the provided userId — i.e. reactions + // from other users on the same message must be filtered out. final fetchedReactions = await reactionDao.getReactionsByUserId(messageId, userId); expect(fetchedReactions.length, insertedReactions.length); From fe4d4e28bc18568c79fd73638aca018a2af3c863 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Tue, 19 May 2026 14:41:49 +0200 Subject: [PATCH 02/17] refactor(dao): Filter reactions by userId at DB level (pinned messages). --- .../src/dao/pinned_message_reaction_dao.dart | 36 +++++++++++-------- .../dao/pinned_message_reaction_dao_test.dart | 13 +++++-- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart b/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart index c5c5a6d456..4ec053b436 100644 --- a/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart @@ -16,17 +16,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor /// Returns all the reactions of a particular message by matching /// [Reactions.messageId] with [messageId] - Future> getReactions(String messageId) => - (select(pinnedMessageReactions).join([ - leftOuterJoin(users, pinnedMessageReactions.userId.equalsExp(users.id)), - ]) - ..where(pinnedMessageReactions.messageId.equals(messageId)) - ..orderBy([OrderingTerm.asc(pinnedMessageReactions.createdAt)])) - .map((rows) { - final userEntity = rows.readTableOrNull(users); - final reactionEntity = rows.readTable(pinnedMessageReactions); - return reactionEntity.toReaction(user: userEntity?.toUser()); - }).get(); + Future> getReactions(String messageId) { + final where = pinnedMessageReactions.messageId.equals(messageId); + return _selectReactions(where); + } /// Returns all the reactions of a particular message /// added by a particular user by matching @@ -34,10 +27,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor /// [Reactions.userId] with [userId] Future> getReactionsByUserId( String messageId, - String userId, - ) async { - final reactions = await getReactions(messageId); - return reactions.where((it) => it.userId == userId).toList(); + String userId,) { + final where = pinnedMessageReactions.messageId.equals( + messageId) & pinnedMessageReactions.userId.equals(userId); + return _selectReactions(where); } /// Updates the reactions data with the new [reactionList] data @@ -57,4 +50,17 @@ class PinnedMessageReactionDao extends DatabaseAccessor (r) => r.messageId.isIn(messageIds), ); }); + + Future> _selectReactions(Expression where) { + final rows = select(pinnedMessageReactions).join([ + leftOuterJoin(users, pinnedMessageReactions.userId.equalsExp(users.id)), + ]) + ..where(where) + ..orderBy([OrderingTerm.asc(pinnedMessageReactions.createdAt)]); + return rows.map((row) { + final reactionEntity = row.readTable(pinnedMessageReactions); + final userEntity = row.readTableOrNull(users); + return reactionEntity.toReaction(user: userEntity?.toUser()); + }).get(); + } } diff --git a/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart index 44abc4a644..640399ef87 100644 --- a/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart @@ -81,20 +81,27 @@ void main() { test('getReactionsByUserId', () async { const messageId = 'testMessageId'; const userId = 'testUserId'; + const otherUserId = 'otherUserid'; // Should be empty initially final reactions = await pinnedMessageReactionDao.getReactionsByUserId(messageId, userId); expect(reactions, isEmpty); - // Adding sample reactions + // Adding sample reactions from the target user. final insertedReactions = await _prepareReactionData(messageId, userId: userId); expect(insertedReactions, isNotEmpty); - // Fetched reaction length should match inserted reactions length. + // Adding sample reactions from other users on the same message. + final otherInsertedReactions = + await _prepareReactionData(messageId, userId: otherUserId); + expect(otherInsertedReactions, isNotEmpty); + + // Fetched reaction length should match the target user's reactions only. // Every reaction messageId should match the provided messageId. - // Every reaction userId should match the provided userId. + // Every reaction userId should match the provided userId — i.e. reactions + // from other users on the same message must be filtered out. final fetchedReactions = await pinnedMessageReactionDao.getReactionsByUserId(messageId, userId); expect(fetchedReactions.length, insertedReactions.length); From 466d9d3ee84ad498d64221bfe3ac496b88fbcf79 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Wed, 20 May 2026 12:01:43 +0200 Subject: [PATCH 03/17] refactor(dao): optimize message retrieval with SQL-side pagination filters --- .../lib/src/dao/message_dao.dart | 77 ++++----- .../test/src/dao/message_dao_test.dart | 147 +++++++++++++++++- 2 files changed, 180 insertions(+), 44 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index 77d84dff07..fdd84526fb 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -1,5 +1,3 @@ -import 'dart:math'; - import 'package:drift/drift.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; @@ -171,6 +169,15 @@ class MessageDao extends DatabaseAccessor bool fetchDraft = true, PaginationParams? messagePagination, }) async { + final lessThanCutoff = await switch (messagePagination?.lessThan) { + final id? => _lookupMessageCreatedAt(id), + _ => null, + }; + final greaterThanCutoff = await switch (messagePagination?.greaterThan) { + final id? => _lookupMessageCreatedAt(id), + _ => null, + }; + final query = select(messages).join([ leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), leftOuterJoin( @@ -180,44 +187,28 @@ class MessageDao extends DatabaseAccessor ]) ..where(messages.channelCid.equals(cid)) ..where(messages.parentId.isNull() | messages.showInChannel.equals(true)) - ..orderBy([OrderingTerm.asc(messages.createdAt)]); + ..orderBy([ + OrderingTerm.desc(messages.createdAt), + OrderingTerm.desc(messages.id), + ]); - final result = await query.get(); - if (result.isEmpty) return []; - - final msgList = await Future.wait( - result.map( - (row) => _messageFromJoinRow( - row, - fetchDraft: fetchDraft, - ), - ), - ); + if (lessThanCutoff case final t?) { + query.where(messages.createdAt.isSmallerThanValue(t)); + } + if (greaterThanCutoff case final t?) { + query.where(messages.createdAt.isBiggerOrEqualValue(t)); + } - if (msgList.isNotEmpty) { - if (messagePagination?.lessThan != null) { - final lessThanIndex = msgList.indexWhere( - (m) => m.id == messagePagination!.lessThan, - ); - if (lessThanIndex != -1) { - msgList.removeRange(lessThanIndex, msgList.length); - } - } - if (messagePagination?.greaterThan != null) { - final greaterThanIndex = msgList.indexWhere( - (m) => m.id == messagePagination!.greaterThan, - ); - if (greaterThanIndex != -1) { - msgList.removeRange(0, greaterThanIndex); - } - } - if (messagePagination?.limit != null) { - return msgList - .skip(max(0, msgList.length - messagePagination!.limit)) - .toList(); - } + if (messagePagination != null) { + query.limit(messagePagination.limit); } - return msgList; + + final rows = (await query.get()).reversed.toList(); + if (rows.isEmpty) return []; + + return Future.wait( + rows.map((row) => _messageFromJoinRow(row, fetchDraft: fetchDraft)), + ); } /// Updates the message data of a particular channel with @@ -241,4 +232,16 @@ class MessageDao extends DatabaseAccessor (batch) => batch.insertAllOnConflictUpdate(messages, entities), ); } + + /// Returns the `createdAt` of the message with [id] in the local cache, + /// or `null` if the message isn't cached. Used by [getMessagesByCid] to + /// resolve `lessThan` / `greaterThan` pagination cursors into SQL-comparable + /// timestamps so the filter runs in SQL instead of after the fact in Dart. + Future _lookupMessageCreatedAt(String id) { + return (selectOnly(messages) + ..addColumns([messages.createdAt]) + ..where(messages.id.equals(id))) + .map((row) => row.read(messages.createdAt)) + .getSingleOrNull(); + } } diff --git a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart index 29cda8c938..c4f2af1518 100644 --- a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart @@ -25,6 +25,12 @@ void main() { }) async { final channels = [ChannelModel(cid: cid)]; final users = List.generate(count, (index) => User(id: 'testUserId$index')); + // Strictly monotonic `createdAt` per message so SQL-side pagination + // filters (`WHERE createdAt < cutoff`, `ORDER BY createdAt ASC`) can't be + // confused by ties. Drift stores `DateTime` as integer Unix seconds by + // default, so the offset must be at least 1 second per row — otherwise + // sub-second offsets all round-trip onto the same second. + final baseTime = DateTime.now(); final messages = List.generate( count, (index) => Message( @@ -32,7 +38,7 @@ void main() { type: 'testType', user: users[index], channelRole: 'channel_member', - createdAt: DateTime.now(), + createdAt: baseTime.add(Duration(seconds: index)), shadowed: math.Random().nextBool(), replyCount: index, updatedAt: DateTime.now(), @@ -59,7 +65,7 @@ void main() { type: 'testType', user: users[index], channelRole: 'channel_member', - createdAt: DateTime.now(), + createdAt: baseTime.add(Duration(seconds: index)), shadowed: math.Random().nextBool(), replyCount: index, updatedAt: DateTime.now(), @@ -85,7 +91,7 @@ void main() { channelRole: 'channel_member', parentId: mapAllThreadToFirstMessage ? messages[0].id : messages[index].id, - createdAt: DateTime.now(), + createdAt: baseTime.add(Duration(seconds: index)), shadowed: math.Random().nextBool(), replyCount: index, updatedAt: DateTime.now(), @@ -294,7 +300,7 @@ void main() { const options = PaginationParams( limit: 15, lessThan: 'testThreadMessageId${cid}25', - greaterThanOrEqual: 'testThreadMessageId${cid}5', + greaterThan: 'testThreadMessageId${cid}5', ); // Messages should be empty initially @@ -320,6 +326,8 @@ void main() { ); expect(threadMessages.length, 15); expect(threadMessages.first.parentId, parentId); + expect(threadMessages.first.id, 'testThreadMessageId${cid}5'); + expect(threadMessages.last.id, 'testThreadMessageId${cid}19'); }); test('getMessagesByCid', () async { @@ -365,11 +373,11 @@ void main() { const cid = 'test:Cid'; const limit = 15; const lessThan = 'testMessageId${cid}25'; - const greaterThanOrEqual = 'testMessageId${cid}5'; + const greaterThan = 'testMessageId${cid}5'; const pagination = PaginationParams( limit: limit, lessThan: lessThan, - greaterThanOrEqual: greaterThanOrEqual, + greaterThan: greaterThan, ); // Should be empty initially @@ -389,8 +397,133 @@ void main() { messagePagination: pagination, ); expect(fetchedMessages.length, limit); + expect(fetchedMessages.first.id, 'testMessageId${cid}10'); expect(fetchedMessages.last.id, 'testMessageId${cid}24'); - expect(fetchedMessages.first.id != lessThan, true); + }); + + group('getMessagesByCid pagination', () { + const cid = 'test:Cid'; + + test('lessThan only trims messages from the end', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 25); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}24'); + }); + + test('greaterThan only trims messages from the start', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThan: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 25); + expect(fetchedMessages.first.id, 'testMessageId${cid}5'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('limit only keeps the last N messages', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams(limit: 15), + ); + + expect(fetchedMessages.length, 15); + expect(fetchedMessages.first.id, 'testMessageId${cid}15'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('lessThan id not in result set is a no-op', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'missing-id', + ), + ); + + expect(fetchedMessages.length, 30); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('greaterThan id not in result set is a no-op', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThan: 'missing-id', + ), + ); + + expect(fetchedMessages.length, 30); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('default PaginationParams() applies implicit limit of 10', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams(), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}20'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('default limit + lessThan returns last 10 of filtered set', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + lessThan: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}15'); + expect(fetchedMessages.last.id, 'testMessageId${cid}24'); + }); + + test('default limit + greaterThan returns last 10 of filtered set', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + greaterThan: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}20'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); }); test('updateMessages', () async { From e1ec8592f179501010b7e755b987a5a0fd73f314 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Thu, 21 May 2026 13:09:26 +0200 Subject: [PATCH 04/17] refactor(dao): fix formatting --- .../lib/src/dao/pinned_message_reaction_dao.dart | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart b/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart index 4ec053b436..6f5fdc6e68 100644 --- a/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart @@ -27,9 +27,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor /// [Reactions.userId] with [userId] Future> getReactionsByUserId( String messageId, - String userId,) { - final where = pinnedMessageReactions.messageId.equals( - messageId) & pinnedMessageReactions.userId.equals(userId); + String userId, + ) { + final where = pinnedMessageReactions.messageId.equals(messageId) & + pinnedMessageReactions.userId.equals(userId); return _selectReactions(where); } From 468960622a1d24f0c0619cd3453a2687d1e9537c Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Thu, 21 May 2026 15:45:54 +0200 Subject: [PATCH 05/17] refactor(dao): Update CHANGELOG.md --- packages/stream_chat_persistence/CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/packages/stream_chat_persistence/CHANGELOG.md b/packages/stream_chat_persistence/CHANGELOG.md index d39ce5ba4c..147de6e493 100644 --- a/packages/stream_chat_persistence/CHANGELOG.md +++ b/packages/stream_chat_persistence/CHANGELOG.md @@ -1,3 +1,12 @@ +## Upcoming Changes + +🚀 Performance + +- Read only the messages matching the `PaginationParams` from DB when calling `MessageDao.getMessagesByCid` instead of reading all messages for the channel and applying pagination in memory. +- Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. +- Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. + + ## 9.24.0 - Updated `stream_chat` dependency to [`9.24.0`](https://pub.dev/packages/stream_chat/changelog). From edb86f7f66d734c2240d23d1bc0f1ab764c0da48 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Thu, 21 May 2026 16:12:20 +0200 Subject: [PATCH 06/17] refactor(dao): Apply formatting --- packages/stream_chat_persistence/lib/src/dao/message_dao.dart | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index fdd84526fb..2aed79cda5 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -239,8 +239,8 @@ class MessageDao extends DatabaseAccessor /// timestamps so the filter runs in SQL instead of after the fact in Dart. Future _lookupMessageCreatedAt(String id) { return (selectOnly(messages) - ..addColumns([messages.createdAt]) - ..where(messages.id.equals(id))) + ..addColumns([messages.createdAt]) + ..where(messages.id.equals(id))) .map((row) => row.read(messages.createdAt)) .getSingleOrNull(); } From 4efc8c42e7960d3ce30bec7305b7553ab501652c Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Fri, 22 May 2026 15:00:21 +0200 Subject: [PATCH 07/17] refactor(dao): optimize message and reaction retrieval with grouped queries --- packages/stream_chat_persistence/CHANGELOG.md | 2 +- .../lib/src/dao/draft_message_dao.dart | 22 + .../lib/src/dao/message_dao.dart | 240 +++++++++-- .../lib/src/dao/pinned_message_dao.dart | 228 ++++++++-- .../src/dao/pinned_message_reaction_dao.dart | 41 ++ .../lib/src/dao/poll_dao.dart | 55 +++ .../lib/src/dao/poll_vote_dao.dart | 28 ++ .../lib/src/dao/reaction_dao.dart | 40 ++ .../lib/src/db/query_utils.dart | 18 + .../test/src/dao/draft_message_dao_test.dart | 44 ++ .../test/src/dao/message_dao_test.dart | 406 ++++++++++++++++++ .../test/src/dao/pinned_message_dao_test.dart | 325 ++++++++++++++ .../dao/pinned_message_reaction_dao_test.dart | 13 + .../test/src/dao/poll_dao_test.dart | 76 ++++ .../test/src/dao/reaction_dao_test.dart | 77 ++++ 15 files changed, 1556 insertions(+), 59 deletions(-) create mode 100644 packages/stream_chat_persistence/lib/src/db/query_utils.dart diff --git a/packages/stream_chat_persistence/CHANGELOG.md b/packages/stream_chat_persistence/CHANGELOG.md index 147de6e493..0c77b99d64 100644 --- a/packages/stream_chat_persistence/CHANGELOG.md +++ b/packages/stream_chat_persistence/CHANGELOG.md @@ -5,7 +5,7 @@ - Read only the messages matching the `PaginationParams` from DB when calling `MessageDao.getMessagesByCid` instead of reading all messages for the channel and applying pagination in memory. - Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. - Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. - +- Improve the message read times from DB. ## 9.24.0 diff --git a/packages/stream_chat_persistence/lib/src/dao/draft_message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/draft_message_dao.dart index b5a5cb6fc9..00e6f7f798 100644 --- a/packages/stream_chat_persistence/lib/src/dao/draft_message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/draft_message_dao.dart @@ -3,6 +3,7 @@ import 'package:drift/drift.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; +import 'package:stream_chat_persistence/src/db/query_utils.dart'; import 'package:stream_chat_persistence/src/entity/entity.dart'; import 'package:stream_chat_persistence/src/mapper/mapper.dart'; @@ -66,6 +67,27 @@ class DraftMessageDao extends DatabaseAccessor return _draftFromEntity(result); } + /// Returns thread drafts in [cid] for every parent message id in + /// [parentIds], keyed by parent message id. + Future> getDraftMessagesByParentIds( + String cid, + List parentIds, + ) async { + if (parentIds.isEmpty) return const {}; + final result = {for (final id in parentIds) id: null}; + for (final chunk in chunked(parentIds)) { + final query = select(draftMessages) + ..where((tbl) => tbl.channelCid.equals(cid) & tbl.parentId.isIn(chunk)); + final entities = await query.get(); + for (final entity in entities) { + if (entity.parentId case final pid?) { + result[pid] = await _draftFromEntity(entity); + } + } + } + return result; + } + /// Updates the draft message data of a particular channel with /// the new [messageList] data. Future updateDraftMessages(List draftMessageList) { diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index 2aed79cda5..06aa4ffe3d 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -1,4 +1,7 @@ +import 'dart:math'; + import 'package:drift/drift.dart'; +import 'package:flutter/foundation.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; import 'package:stream_chat_persistence/src/entity/messages.dart'; @@ -76,6 +79,125 @@ class MessageDao extends DatabaseAccessor ); } + /// Hydrates `rows` into `Message`s using batched lookups for related + /// entities. Reactions, polls, quoted messages and (optionally) drafts are + /// each fetched once via a single `WHERE ... IN (?). + Future> _messagesFromJoinRows( + List rows, { + bool fetchDraft = false, + }) async { + if (rows.isEmpty) return const []; + + final messageIds = []; + final quotedIds = []; + final pollIds = []; + // note: While possible, in real case scenarios this will NOT hold more than + // a single value. + final cids = {}; + for (final row in rows) { + final msg = row.readTable(messages); + messageIds.add(msg.id); + if (msg.quotedMessageId case final id?) quotedIds.add(id); + if (msg.pollId case final id?) pollIds.add(id); + cids.add(msg.channelCid); + } + + final results = await Future.wait([ + // Reactions + _db.reactionDao.getReactionsForMessages(messageIds), + // Own reactions + _db.reactionDao.getReactionsForMessagesByUserId(messageIds, _db.userId), + // Polls + if (pollIds.isNotEmpty) + _db.pollDao.getPollsByIds(pollIds) + else + Future.value(const {}), + // Drafts + if (fetchDraft) + Future.wait([ + for (final cid in cids) + _db.draftMessageDao + .getDraftMessagesByParentIds(cid, messageIds) + .then((map) => MapEntry(cid, map)), + ]).then(Map.fromEntries) + else + Future.value(const >{}), + ]); + + final latestReactionsByMsg = results[0] as Map>; + final ownReactionsByMsg = results[1] as Map>; + final pollsById = results[2] as Map; + final draftsByCidByParentId = + results[3] as Map>; + + final quotedById = {}; + if (quotedIds.isNotEmpty) { + final quoteRows = await (select(messages).join([ + leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), + leftOuterJoin( + _pinnedByUsers, + messages.pinnedByUserId.equalsExp(_pinnedByUsers.id), + ), + ]) + ..where(messages.id.isIn(quotedIds))) + .get(); + final quotedMessages = await _messagesFromJoinRows( + quoteRows, + fetchDraft: true, + ); + for (final m in quotedMessages) { + quotedById[m.id] = m; + } + } + + return [ + for (final row in rows) + _buildMessage( + row, + latestReactionsByMsg: latestReactionsByMsg, + ownReactionsByMsg: ownReactionsByMsg, + pollsById: pollsById, + quotedById: quotedById, + draftsByCidByParentId: draftsByCidByParentId, + ), + ]; + } + + /// Builds a single [Message] from a join row + the pre-fetched maps + /// assembled by [_messagesFromJoinRows]. + Message _buildMessage( + TypedResult row, { + required Map> latestReactionsByMsg, + required Map> ownReactionsByMsg, + required Map pollsById, + required Map quotedById, + required Map> draftsByCidByParentId, + }) { + final userEntity = row.readTableOrNull(_users); + final pinnedByEntity = row.readTableOrNull(_pinnedByUsers); + final msgEntity = row.readTable(messages); + + final quotedMessage = switch (msgEntity.quotedMessageId) { + final id? => quotedById[id], + _ => null, + }; + final poll = switch (msgEntity.pollId) { + final id? => pollsById[id], + _ => null, + }; + final draft = draftsByCidByParentId[msgEntity.channelCid]?[msgEntity.id]; + + return msgEntity.toMessage( + user: userEntity?.toUser(), + pinnedBy: pinnedByEntity?.toUser(), + latestReactions: latestReactionsByMsg[msgEntity.id] ?? const [], + ownReactions: ownReactionsByMsg[msgEntity.id] ?? const [], + quotedMessage: quotedMessage, + poll: poll, + draft: draft, + ); + } + /// Returns a single message by matching the [Messages.id] with [id]. /// /// If [fetchDraft] is true, it will also fetch the draft message for the @@ -96,27 +218,27 @@ class MessageDao extends DatabaseAccessor final result = await query.getSingleOrNull(); if (result == null) return null; - return _messageFromJoinRow( - result, - fetchDraft: fetchDraft, - ); + final hydrated = + await _messagesFromJoinRows([result], fetchDraft: fetchDraft); + return hydrated.firstOrNull; } /// Returns all the messages of a particular thread by matching /// [Messages.channelCid] with [cid] - Future> getThreadMessages(String cid) async => - Future.wait(await (select(messages).join([ - leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), - leftOuterJoin( - _pinnedByUsers, - messages.pinnedByUserId.equalsExp(_pinnedByUsers.id), - ), - ]) - ..where(messages.channelCid.equals(cid)) - ..where(messages.parentId.isNotNull()) - ..orderBy([OrderingTerm.asc(messages.createdAt)])) - .map(_messageFromJoinRow) - .get()); + Future> getThreadMessages(String cid) async { + final rows = await (select(messages).join([ + leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), + leftOuterJoin( + _pinnedByUsers, + messages.pinnedByUserId.equalsExp(_pinnedByUsers.id), + ), + ]) + ..where(messages.channelCid.equals(cid)) + ..where(messages.parentId.isNotNull()) + ..orderBy([OrderingTerm.asc(messages.createdAt)])) + .get(); + return _messagesFromJoinRows(rows); + } /// Returns all the messages of a particular thread by matching /// [Messages.parentId] with [parentId] @@ -124,7 +246,7 @@ class MessageDao extends DatabaseAccessor String parentId, { PaginationParams? options, }) async { - final msgList = await Future.wait(await (select(messages).join([ + final rows = await (select(messages).join([ leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), leftOuterJoin( _pinnedByUsers, @@ -134,29 +256,91 @@ class MessageDao extends DatabaseAccessor ..where(messages.parentId.isNotNull()) ..where(messages.parentId.equals(parentId)) ..orderBy([OrderingTerm.asc(messages.createdAt)])) - .map(_messageFromJoinRow) - .get()); + .get(); + final msgList = await _messagesFromJoinRows(rows); if (msgList.isNotEmpty) { + final mutable = msgList.toList(); if (options?.lessThan != null) { - final lessThanIndex = msgList.indexWhere( + final lessThanIndex = mutable.indexWhere( (m) => m.id == options!.lessThan, ); if (lessThanIndex != -1) { - msgList.removeRange(lessThanIndex, msgList.length); + mutable.removeRange(lessThanIndex, mutable.length); } } if (options?.greaterThan != null) { - final greaterThanIndex = msgList.indexWhere( + final greaterThanIndex = mutable.indexWhere( (m) => m.id == options!.greaterThan, ); if (greaterThanIndex != -1) { - msgList.removeRange(0, greaterThanIndex); + mutable.removeRange(0, greaterThanIndex); } } final limit = options?.limit; if (limit != null && limit > 0) { - return msgList.take(limit).toList(); + return mutable.take(limit).toList(); + } + return mutable; + } + return msgList; + } + + /// Pre-SQL-pushdown reference implementation of [getMessagesByCid]. Fetches + /// every cached message for the channel, hydrates each row, then trims the + /// result in Dart. Kept only as the head-to-head baseline for the + /// `get_messages_by_cid_bench_test.dart` benchmark — remove once we no + /// longer need behavioral parity proof. + @visibleForTesting + Future> getMessagesByCidLegacy( + String cid, { + bool fetchDraft = true, + PaginationParams? messagePagination, + }) async { + final query = select(messages).join([ + leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), + leftOuterJoin( + _pinnedByUsers, + messages.pinnedByUserId.equalsExp(_pinnedByUsers.id), + ), + ]) + ..where(messages.channelCid.equals(cid)) + ..where(messages.parentId.isNull() | messages.showInChannel.equals(true)) + ..orderBy([OrderingTerm.asc(messages.createdAt)]); + + final result = await query.get(); + if (result.isEmpty) return []; + + final msgList = await Future.wait( + result.map( + (row) => _messageFromJoinRow( + row, + fetchDraft: fetchDraft, + ), + ), + ); + + if (msgList.isNotEmpty) { + if (messagePagination?.lessThan != null) { + final lessThanIndex = msgList.indexWhere( + (m) => m.id == messagePagination!.lessThan, + ); + if (lessThanIndex != -1) { + msgList.removeRange(lessThanIndex, msgList.length); + } + } + if (messagePagination?.greaterThan != null) { + final greaterThanIndex = msgList.indexWhere( + (m) => m.id == messagePagination!.greaterThan, + ); + if (greaterThanIndex != -1) { + msgList.removeRange(0, greaterThanIndex); + } + } + if (messagePagination?.limit != null) { + return msgList + .skip(max(0, msgList.length - messagePagination!.limit)) + .toList(); } } return msgList; @@ -204,11 +388,7 @@ class MessageDao extends DatabaseAccessor } final rows = (await query.get()).reversed.toList(); - if (rows.isEmpty) return []; - - return Future.wait( - rows.map((row) => _messageFromJoinRow(row, fetchDraft: fetchDraft)), - ); + return _messagesFromJoinRows(rows, fetchDraft: fetchDraft); } /// Updates the message data of a particular channel with diff --git a/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart index 7992accf5d..f556c68b71 100644 --- a/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart @@ -1,4 +1,5 @@ import 'package:drift/drift.dart'; +import 'package:flutter/foundation.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; import 'package:stream_chat_persistence/src/entity/pinned_messages.dart'; @@ -79,6 +80,123 @@ class PinnedMessageDao extends DatabaseAccessor ); } + /// Hydrates `rows` (from the pinned-messages table) into `Message`s using + /// batched lookups for related entities. + Future> _messagesFromJoinRows( + List rows, { + bool fetchDraft = false, + }) async { + if (rows.isEmpty) return const []; + + final messageIds = []; + final quotedIds = []; + final pollIds = []; + // note: While possible, in real case scenarios this will NOT hold more than + // a single value. + final cids = {}; + for (final row in rows) { + final msg = row.readTable(pinnedMessages); + messageIds.add(msg.id); + if (msg.quotedMessageId case final id?) quotedIds.add(id); + if (msg.pollId case final id?) pollIds.add(id); + cids.add(msg.channelCid); + } + + final results = await Future.wait([ + // Reactions + _db.pinnedMessageReactionDao.getReactionsForMessages(messageIds), + // Own reactions + _db.pinnedMessageReactionDao + .getReactionsForMessagesByUserId(messageIds, _db.userId), + // Polls + if (pollIds.isNotEmpty) + _db.pollDao.getPollsByIds(pollIds) + else + Future.value(const {}), + // Drafts + if (fetchDraft) + Future.wait([ + for (final cid in cids) + _db.draftMessageDao + .getDraftMessagesByParentIds(cid, messageIds) + .then((map) => MapEntry(cid, map)), + ]).then(Map.fromEntries) + else + Future.value(const >{}), + ]); + + final latestReactionsByMsg = results[0] as Map>; + final ownReactionsByMsg = results[1] as Map>; + final pollsById = results[2] as Map; + final draftsByCidByParentId = + results[3] as Map>; + + final quotedById = {}; + if (quotedIds.isNotEmpty) { + final quoteRows = await (select(pinnedMessages).join([ + leftOuterJoin(_users, pinnedMessages.userId.equalsExp(_users.id)), + leftOuterJoin( + _pinnedByUsers, + pinnedMessages.pinnedByUserId.equalsExp(_pinnedByUsers.id), + ), + ]) + ..where(pinnedMessages.id.isIn(quotedIds))) + .get(); + final quotedMessages = await _messagesFromJoinRows( + quoteRows, + fetchDraft: true, + ); + for (final m in quotedMessages) { + quotedById[m.id] = m; + } + } + + return [ + for (final row in rows) + _buildMessage( + row, + latestReactionsByMsg: latestReactionsByMsg, + ownReactionsByMsg: ownReactionsByMsg, + pollsById: pollsById, + quotedById: quotedById, + draftsByCidByParentId: draftsByCidByParentId, + ), + ]; + } + + Message _buildMessage( + TypedResult row, { + required Map> latestReactionsByMsg, + required Map> ownReactionsByMsg, + required Map pollsById, + required Map quotedById, + required Map> draftsByCidByParentId, + }) { + final userEntity = row.readTableOrNull(_users); + final pinnedByEntity = row.readTableOrNull(_pinnedByUsers); + final msgEntity = row.readTable(pinnedMessages); + + final quotedMessage = switch (msgEntity.quotedMessageId) { + final id? => quotedById[id], + _ => null, + }; + final poll = switch (msgEntity.pollId) { + final id? => pollsById[id], + _ => null, + }; + final draft = draftsByCidByParentId[msgEntity.channelCid]?[msgEntity.id]; + + return msgEntity.toMessage( + user: userEntity?.toUser(), + pinnedBy: pinnedByEntity?.toUser(), + latestReactions: latestReactionsByMsg[msgEntity.id] ?? const [], + ownReactions: ownReactionsByMsg[msgEntity.id] ?? const [], + quotedMessage: quotedMessage, + poll: poll, + draft: draft, + ); + } + /// Returns a single message by matching the [PinnedMessages.id] with [id] Future getMessageById( String id, { @@ -96,27 +214,27 @@ class PinnedMessageDao extends DatabaseAccessor final result = await query.getSingleOrNull(); if (result == null) return null; - return _messageFromJoinRow( - result, - fetchDraft: fetchDraft, - ); + final hydrated = + await _messagesFromJoinRows([result], fetchDraft: fetchDraft); + return hydrated.firstOrNull; } /// Returns all the messages of a particular thread by matching /// [PinnedMessages.channelCid] with [cid] - Future> getThreadMessages(String cid) async => - Future.wait(await (select(pinnedMessages).join([ - leftOuterJoin(_users, pinnedMessages.userId.equalsExp(_users.id)), - leftOuterJoin( - _pinnedByUsers, - pinnedMessages.pinnedByUserId.equalsExp(_pinnedByUsers.id), - ), - ]) - ..where(pinnedMessages.channelCid.equals(cid)) - ..where(pinnedMessages.parentId.isNotNull()) - ..orderBy([OrderingTerm.asc(pinnedMessages.createdAt)])) - .map(_messageFromJoinRow) - .get()); + Future> getThreadMessages(String cid) async { + final rows = await (select(pinnedMessages).join([ + leftOuterJoin(_users, pinnedMessages.userId.equalsExp(_users.id)), + leftOuterJoin( + _pinnedByUsers, + pinnedMessages.pinnedByUserId.equalsExp(_pinnedByUsers.id), + ), + ]) + ..where(pinnedMessages.channelCid.equals(cid)) + ..where(pinnedMessages.parentId.isNotNull()) + ..orderBy([OrderingTerm.asc(pinnedMessages.createdAt)])) + .get(); + return _messagesFromJoinRows(rows); + } /// Returns all the messages of a particular thread by matching /// [PinnedMessages.parentId] with [parentId] @@ -124,7 +242,7 @@ class PinnedMessageDao extends DatabaseAccessor String parentId, { PaginationParams? options, }) async { - final msgList = await Future.wait(await (select(pinnedMessages).join([ + final rows = await (select(pinnedMessages).join([ leftOuterJoin(_users, pinnedMessages.userId.equalsExp(_users.id)), leftOuterJoin( _pinnedByUsers, @@ -134,36 +252,42 @@ class PinnedMessageDao extends DatabaseAccessor ..where(pinnedMessages.parentId.isNotNull()) ..where(pinnedMessages.parentId.equals(parentId)) ..orderBy([OrderingTerm.asc(pinnedMessages.createdAt)])) - .map(_messageFromJoinRow) - .get()); + .get(); + final msgList = await _messagesFromJoinRows(rows); if (msgList.isNotEmpty) { + final mutable = msgList.toList(); if (options?.lessThan != null) { - final lessThanIndex = msgList.indexWhere( + final lessThanIndex = mutable.indexWhere( (m) => m.id == options!.lessThan, ); if (lessThanIndex != -1) { - msgList.removeRange(lessThanIndex, msgList.length); + mutable.removeRange(lessThanIndex, mutable.length); } } if (options?.greaterThan != null) { - final greaterThanIndex = msgList.indexWhere( + final greaterThanIndex = mutable.indexWhere( (m) => m.id == options!.greaterThan, ); if (greaterThanIndex != -1) { - msgList.removeRange(0, greaterThanIndex); + mutable.removeRange(0, greaterThanIndex); } } if (options?.limit != null) { - return msgList.take(options!.limit).toList(); + return mutable.take(options!.limit).toList(); } + return mutable; } return msgList; } - /// Returns all the messages of a channel by matching - /// [PinnedMessages.channelCid] with [parentId] - Future> getMessagesByCid( + /// Pre-batched-hydration reference implementation of [getMessagesByCid]. + /// Same shape as the current public method but uses the per-row N+1 + /// [_messageFromJoinRow] path. Kept only as the head-to-head baseline for + /// `get_messages_by_cid_pinned_hydration_bench_test.dart` and the pinned + /// parity test — remove once we no longer need behavioral parity proof. + @visibleForTesting + Future> getMessagesByCidLegacy( String cid, { bool fetchDraft = true, PaginationParams? messagePagination, @@ -216,6 +340,54 @@ class PinnedMessageDao extends DatabaseAccessor return msgList; } + /// Returns all the messages of a channel by matching + /// [PinnedMessages.channelCid] with [parentId] + Future> getMessagesByCid( + String cid, { + bool fetchDraft = true, + PaginationParams? messagePagination, + }) async { + final query = select(pinnedMessages).join([ + leftOuterJoin(_users, pinnedMessages.userId.equalsExp(_users.id)), + leftOuterJoin( + _pinnedByUsers, + pinnedMessages.pinnedByUserId.equalsExp(_pinnedByUsers.id), + ), + ]) + ..where(pinnedMessages.channelCid.equals(cid)) + ..where(pinnedMessages.parentId.isNull() | + pinnedMessages.showInChannel.equals(true)) + ..orderBy([OrderingTerm.asc(pinnedMessages.createdAt)]); + + final rows = await query.get(); + final msgList = await _messagesFromJoinRows(rows, fetchDraft: fetchDraft); + + if (msgList.isNotEmpty) { + final mutable = msgList.toList(); + if (messagePagination?.lessThan != null) { + final lessThanIndex = mutable.indexWhere( + (m) => m.id == messagePagination!.lessThan, + ); + if (lessThanIndex != -1) { + mutable.removeRange(lessThanIndex, mutable.length); + } + } + if (messagePagination?.greaterThan != null) { + final greaterThanIndex = mutable.indexWhere( + (m) => m.id == messagePagination!.greaterThan, + ); + if (greaterThanIndex != -1) { + mutable.removeRange(0, greaterThanIndex); + } + } + if (messagePagination?.limit != null) { + return mutable.take(messagePagination!.limit).toList(); + } + return mutable; + } + return msgList; + } + /// Updates the message data of a particular channel with /// the new [messageList] data Future updateMessages(String cid, List messageList) => diff --git a/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart b/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart index 6f5fdc6e68..c319809fd0 100644 --- a/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart @@ -1,6 +1,7 @@ import 'package:drift/drift.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; +import 'package:stream_chat_persistence/src/db/query_utils.dart'; import 'package:stream_chat_persistence/src/entity/pinned_message_reactions.dart'; import 'package:stream_chat_persistence/src/entity/users.dart'; import 'package:stream_chat_persistence/src/mapper/mapper.dart'; @@ -34,6 +35,46 @@ class PinnedMessageReactionDao extends DatabaseAccessor return _selectReactions(where); } + /// Returns pinned-message reactions for every id in [messageIds], grouped + /// by message id. + Future>> getReactionsForMessages( + List messageIds, + ) async { + if (messageIds.isEmpty) return const {}; + final grouped = >{ + for (final id in messageIds) id: [], + }; + for (final chunk in chunked(messageIds)) { + final where = pinnedMessageReactions.messageId.isIn(chunk); + final rows = await _selectReactions(where); + for (final r in rows) { + grouped[r.messageId]!.add(r); + } + } + return grouped; + } + + /// Returns pinned-message reactions for every id in [messageIds] that were + /// added by [userId], grouped by message id. + Future>> getReactionsForMessagesByUserId( + List messageIds, + String userId, + ) async { + if (messageIds.isEmpty) return const {}; + final grouped = >{ + for (final id in messageIds) id: [], + }; + for (final chunk in chunked(messageIds)) { + final where = pinnedMessageReactions.messageId.isIn(chunk) & + pinnedMessageReactions.userId.equals(userId); + final rows = await _selectReactions(where); + for (final r in rows) { + grouped[r.messageId]!.add(r); + } + } + return grouped; + } + /// Updates the reactions data with the new [reactionList] data Future updateReactions(List reactionList) => batch((it) { it.insertAllOnConflictUpdate( diff --git a/packages/stream_chat_persistence/lib/src/dao/poll_dao.dart b/packages/stream_chat_persistence/lib/src/dao/poll_dao.dart index 10a10a024d..8d974e15c4 100644 --- a/packages/stream_chat_persistence/lib/src/dao/poll_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/poll_dao.dart @@ -1,6 +1,7 @@ import 'package:drift/drift.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; +import 'package:stream_chat_persistence/src/db/query_utils.dart'; import 'package:stream_chat_persistence/src/entity/poll_votes.dart'; import 'package:stream_chat_persistence/src/entity/polls.dart'; import 'package:stream_chat_persistence/src/entity/users.dart'; @@ -51,6 +52,32 @@ class PollDao extends DatabaseAccessor with _$PollDaoMixin { .map(_pollFromJoinRow) .getSingleOrNull(); + /// Returns polls for every id in [pollIds], keyed by poll id. + Future> getPollsByIds(List pollIds) async { + if (pollIds.isEmpty) return const {}; + + // Group votes once for every requested poll. The dense-map contract on + // `getPollVotesForPolls` means every input id resolves to a (possibly + // empty) list with a single lookup. + final votesByPoll = await _db.pollVoteDao.getPollVotesForPolls(pollIds); + + final result = {for (final id in pollIds) id: null}; + for (final chunk in chunked(pollIds)) { + final where = polls.id.isIn(chunk); + final rows = await (select(polls)..where((_) => where)).join( + [leftOuterJoin(users, polls.createdById.equalsExp(users.id))]).get(); + for (final row in rows) { + final pollEntity = row.readTable(polls); + // Same as `_pollFromJoinRow` => reads users via `readTable` (not + // `readTableOrNull`) on a LEFT JOIN + final userEntity = row.readTable(users); + final allVotes = votesByPoll[pollEntity.id] ?? const []; + result[pollEntity.id] = _buildPoll(pollEntity, userEntity, allVotes); + } + } + return result; + } + /// Updates all the polls using the new [pollList] data Future updatePolls(List pollList) => batch( (it) => it.insertAllOnConflictUpdate( @@ -69,4 +96,32 @@ class PollDao extends DatabaseAccessor with _$PollDaoMixin { /// Deletes all the polls whose [Polls.id] is present in [pollIds] Future deletePollsByIds(List pollIds) => (delete(polls)..where((tbl) => tbl.id.isIn(pollIds))).go(); + + Poll _buildPoll( + PollEntity pollEntity, + UserEntity userEntity, + List allVotes, + ) { + final latestAnswers = allVotes.where((it) => it.isAnswer); + final ownVotesAndAnswers = allVotes.where((it) => it.userId == _db.userId); + + final latestVotesByOption = >{}; + for (final vote in allVotes) { + if (vote.isAnswer) continue; + if (vote.optionId case final optionId?) { + latestVotesByOption.update( + optionId, + (value) => [...value, vote], + ifAbsent: () => [vote], + ); + } + } + + return pollEntity.toPoll( + createdBy: userEntity.toUser(), + latestAnswers: latestAnswers.toList(), + ownVotesAndAnswers: ownVotesAndAnswers.toList(), + latestVotesByOption: latestVotesByOption, + ); + } } diff --git a/packages/stream_chat_persistence/lib/src/dao/poll_vote_dao.dart b/packages/stream_chat_persistence/lib/src/dao/poll_vote_dao.dart index 55884f4a70..571ecfa568 100644 --- a/packages/stream_chat_persistence/lib/src/dao/poll_vote_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/poll_vote_dao.dart @@ -1,6 +1,7 @@ import 'package:drift/drift.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; +import 'package:stream_chat_persistence/src/db/query_utils.dart'; import 'package:stream_chat_persistence/src/entity/poll_votes.dart'; import 'package:stream_chat_persistence/src/entity/polls.dart'; import 'package:stream_chat_persistence/src/entity/users.dart'; @@ -30,6 +31,33 @@ class PollVoteDao extends DatabaseAccessor return pollVoteEntity.toPollVote(user: userEntity?.toUser()); }).get(); + /// Returns poll votes for every id in [pollIds], grouped by poll id. + Future>> getPollVotesForPolls( + List pollIds, + ) async { + if (pollIds.isEmpty) return const {}; + final grouped = >{ + for (final id in pollIds) id: [], + }; + for (final chunk in chunked(pollIds)) { + final where = pollVotes.pollId.isIn(chunk); + final rows = await (select(pollVotes).join([ + leftOuterJoin(users, pollVotes.userId.equalsExp(users.id)), + ]) + ..where(where) + ..orderBy([OrderingTerm.asc(pollVotes.createdAt)])) + .map((row) { + final userEntity = row.readTableOrNull(users); + final pollVoteEntity = row.readTable(pollVotes); + return pollVoteEntity.toPollVote(user: userEntity?.toUser()); + }).get(); + for (final v in rows) { + grouped[v.pollId]!.add(v); + } + } + return grouped; + } + /// Updates the poll votes data with the new [pollVoteList] data Future updatePollVotes(List pollVoteList) => batch( (it) => it.insertAllOnConflictUpdate( diff --git a/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart b/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart index 9d726bc8e3..8a8cd2bf57 100644 --- a/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart @@ -1,6 +1,7 @@ import 'package:drift/drift.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; +import 'package:stream_chat_persistence/src/db/query_utils.dart'; import 'package:stream_chat_persistence/src/entity/reactions.dart'; import 'package:stream_chat_persistence/src/entity/users.dart'; import 'package:stream_chat_persistence/src/mapper/mapper.dart'; @@ -34,6 +35,45 @@ class ReactionDao extends DatabaseAccessor return _selectReactions(where); } + /// Returns reactions for every id in [messageIds], grouped by message id. + Future>> getReactionsForMessages( + List messageIds, + ) async { + if (messageIds.isEmpty) return const {}; + final grouped = >{ + for (final id in messageIds) id: [], + }; + for (final chunk in chunked(messageIds)) { + final where = reactions.messageId.isIn(chunk); + final rows = await _selectReactions(where); + for (final r in rows) { + grouped[r.messageId]!.add(r); + } + } + return grouped; + } + + /// Returns reactions for every id in [messageIds] that were added by + /// [userId], grouped by message id. + Future>> getReactionsForMessagesByUserId( + List messageIds, + String userId, + ) async { + if (messageIds.isEmpty) return const {}; + final grouped = >{ + for (final id in messageIds) id: [], + }; + for (final chunk in chunked(messageIds)) { + final where = + reactions.messageId.isIn(chunk) & reactions.userId.equals(userId); + final rows = await _selectReactions(where); + for (final r in rows) { + grouped[r.messageId]!.add(r); + } + } + return grouped; + } + /// Updates the reactions data with the new [reactionList] data Future updateReactions(List reactionList) => batch((it) { it.insertAllOnConflictUpdate( diff --git a/packages/stream_chat_persistence/lib/src/db/query_utils.dart b/packages/stream_chat_persistence/lib/src/db/query_utils.dart new file mode 100644 index 0000000000..1a2f966592 --- /dev/null +++ b/packages/stream_chat_persistence/lib/src/db/query_utils.dart @@ -0,0 +1,18 @@ +import 'dart:math' as math; + +/// Splits a list of bind parameters into chunks safe for a single +/// `WHERE col IN (?, ?, ...)` SQLite statement. +/// +/// SQLite enforces `SQLITE_MAX_VARIABLE_NUMBER` (default 999 on the Android +/// build shipped via `sqlite3_flutter_libs`) on the number of `?` placeholders +/// per statement. Each id in the list becomes one placeholder, so an +/// unbounded id list would fail the query outright with +/// `too many SQL variables`. Callers that batch-load related rows by id +/// must run one SELECT per chunk and merge the results in Dart. The default +/// chunk size of 900 leaves headroom for any other bound parameters that +/// share the same statement (for example a `AND userId = ?` filter). +Iterable> chunked(List input, [int size = 900]) sync* { + for (var i = 0; i < input.length; i += size) { + yield input.sublist(i, math.min(i + size, input.length)); + } +} diff --git a/packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart index 15c8e48b2f..9de6fc71c8 100644 --- a/packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart @@ -378,6 +378,50 @@ void main() { }); }); + test( + 'getDraftMessageByCid hydrates parent message with draft=null ' + '(propagates fetchDraft=false to prevent recursion)', () async { + const cid = 'test:fetchDraftPropagation'; + const parentId = 'parent-msg'; + + final user = User(id: 'testUserId'); + final parentMessage = Message( + id: parentId, + user: user, + createdAt: DateTime.now(), + text: 'Parent', + ); + + await database.userDao.updateUsers([user]); + await database.channelDao.updateChannels([ChannelModel(cid: cid)]); + await database.messageDao.updateMessages(cid, [parentMessage]); + + await draftMessageDao.updateDraftMessages([ + Draft( + channelCid: cid, + parentId: parentId, + createdAt: DateTime.now(), + message: DraftMessage( + id: 'thread-draft', + text: 'reply draft', + parentId: parentId, + ), + ), + ]); + + final fetched = + await draftMessageDao.getDraftMessageByCid(cid, parentId: parentId); + + expect(fetched, isNotNull); + expect(fetched!.parentMessage, isNotNull); + expect(fetched.parentMessage!.id, parentId); + expect( + fetched.parentMessage!.draft, + isNull, + reason: 'parent message hydration must pass fetchDraft=false', + ); + }); + group('DraftMessages entity references', () { test( 'should delete draft messages when referenced channel is deleted', diff --git a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart index c4f2af1518..53ef93d217 100644 --- a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart @@ -568,6 +568,412 @@ void main() { ); }); + // Covers message enrichment logic + group('hydration', () { + const cid = 'test:Hydration'; + + Future _seedChannel(String channelCid) async { + await database.channelDao.updateChannels([ChannelModel(cid: channelCid)]); + } + + test('getMessageById hydrates multiple latest and own reactions', () async { + const messageId = 'msg-multi-reactions'; + await _seedChannel(cid); + + final dbUser = User(id: 'testUserId'); + final otherUser = User(id: 'otherUser'); + await database.userDao.updateUsers([dbUser, otherUser]); + + await messageDao.updateMessages(cid, [ + Message( + id: messageId, + type: 'regular', + user: dbUser, + text: 'Hello', + createdAt: DateTime.now(), + ), + ]); + + // 2 reactions by the DB user, 1 by another user. + await database.reactionDao.updateReactions([ + Reaction( + type: 'like', + messageId: messageId, + user: dbUser, + createdAt: DateTime.now(), + ), + Reaction( + type: 'love', + messageId: messageId, + user: dbUser, + createdAt: DateTime.now().add(const Duration(seconds: 1)), + ), + Reaction( + type: 'wow', + messageId: messageId, + user: otherUser, + createdAt: DateTime.now().add(const Duration(seconds: 2)), + ), + ]); + + final fetched = await messageDao.getMessageById(messageId); + expect(fetched, isNotNull); + expect(fetched!.latestReactions, hasLength(3)); + expect(fetched.ownReactions, hasLength(2)); + expect( + fetched.ownReactions!.every((r) => r.user?.id == dbUser.id), + isTrue, + ); + expect( + fetched.latestReactions!.map((r) => r.type).toSet(), + equals({'like', 'love', 'wow'}), + ); + }); + + test('getMessagesByCid hydrates reactions per row independently', () async { + await _seedChannel(cid); + + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + final baseTime = DateTime.now(); + final messages = List.generate( + 5, + (i) => Message( + id: 'msg-iso-$i', + type: 'regular', + user: dbUser, + text: 'Hello $i', + createdAt: baseTime.add(Duration(seconds: i)), + ), + ); + await messageDao.updateMessages(cid, messages); + + // 2 reactions per message, distinct types per-row. + final reactions = [ + for (var i = 0; i < messages.length; i++) ...[ + Reaction( + type: 'like-$i', + messageId: messages[i].id, + user: dbUser, + createdAt: baseTime.add(Duration(seconds: i)), + ), + Reaction( + type: 'love-$i', + messageId: messages[i].id, + user: dbUser, + createdAt: baseTime.add(Duration(seconds: i, milliseconds: 1)), + ), + ], + ]; + await database.reactionDao.updateReactions(reactions); + + final fetched = await messageDao.getMessagesByCid(cid); + expect(fetched, hasLength(5)); + for (final m in fetched) { + expect(m.latestReactions, hasLength(2)); + // Reactions must belong to this message only — no cross-contamination + // between rows when batched-hydration replaces the per-row queries. + expect( + m.latestReactions!.map((r) => r.type).toSet(), + equals( + {'like-${m.id.split('-').last}', 'love-${m.id.split('-').last}'}), + ); + } + }); + + test('getMessagesByCid hydrates poll with own + other-user votes', + () async { + const messageId = 'msg-with-poll'; + const pollId = 'poll-mixed'; + await _seedChannel(cid); + + final dbUser = User(id: 'testUserId'); + final otherUser = User(id: 'otherUser'); + await database.userDao.updateUsers([dbUser, otherUser]); + + const optionA = PollOption(id: 'opt-a', text: 'A'); + const optionB = PollOption(id: 'opt-b', text: 'B'); + + final poll = Poll( + id: pollId, + name: 'Pick one', + options: const [optionA, optionB], + createdBy: dbUser, + createdById: dbUser.id, + ); + await database.pollDao.updatePolls([poll]); + + await messageDao.updateMessages(cid, [ + Message( + id: messageId, + type: 'regular', + user: dbUser, + text: 'Vote please', + createdAt: DateTime.now(), + pollId: pollId, + ), + ]); + + // 2 own votes (one per option), 2 other-user votes, 1 own answer. + await database.pollVoteDao.updatePollVotes([ + PollVote( + id: 'v1', + pollId: pollId, + userId: dbUser.id, + user: dbUser, + optionId: optionA.id, + createdAt: DateTime.now(), + ), + PollVote( + id: 'v2', + pollId: pollId, + userId: dbUser.id, + user: dbUser, + optionId: optionB.id, + createdAt: DateTime.now().add(const Duration(seconds: 1)), + ), + PollVote( + id: 'v3', + pollId: pollId, + userId: otherUser.id, + user: otherUser, + optionId: optionA.id, + createdAt: DateTime.now().add(const Duration(seconds: 2)), + ), + PollVote( + id: 'v4', + pollId: pollId, + userId: otherUser.id, + user: otherUser, + optionId: optionB.id, + createdAt: DateTime.now().add(const Duration(seconds: 3)), + ), + PollVote( + id: 'a1', + pollId: pollId, + userId: dbUser.id, + user: dbUser, + answerText: 'because', + createdAt: DateTime.now().add(const Duration(seconds: 4)), + ), + ]); + + final fetched = await messageDao.getMessagesByCid(cid); + expect(fetched, hasLength(1)); + final hydratedPoll = fetched.first.poll; + expect(hydratedPoll, isNotNull); + expect(hydratedPoll!.id, pollId); + expect(hydratedPoll.latestVotesByOption[optionA.id], hasLength(2)); + expect(hydratedPoll.latestVotesByOption[optionB.id], hasLength(2)); + expect(hydratedPoll.latestAnswers, hasLength(1)); + // Own votes + own answer => 3 total in ownVotesAndAnswers. + expect(hydratedPoll.ownVotesAndAnswers, hasLength(3)); + expect( + hydratedPoll.ownVotesAndAnswers.every((v) => v.userId == dbUser.id), + isTrue, + ); + }); + + test( + 'getMessagesByCid hydrates thread draft when fetchDraft=true; ' + 'null when false', () async { + // `fetchDraft` attaches a thread draft (parentId == message.id) to its + // parent message, not the channel-level draft. See changelog entry for + // 9.15.0. + await _seedChannel(cid); + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + const parentId = 'msg-with-draft'; + await messageDao.updateMessages(cid, [ + Message( + id: parentId, + type: 'regular', + user: dbUser, + text: 'msg', + createdAt: DateTime.now(), + ), + ]); + + await database.draftMessageDao.updateDraftMessages([ + Draft( + channelCid: cid, + parentId: parentId, + createdAt: DateTime.now(), + message: DraftMessage( + id: 'draft-0', + text: 'unsent', + parentId: parentId, + ), + ), + ]); + + final withDraft = await messageDao.getMessagesByCid(cid); + expect(withDraft.first.draft, isNotNull); + expect(withDraft.first.draft!.message.text, 'unsent'); + expect(withDraft.first.draft!.parentId, parentId); + + final withoutDraft = + await messageDao.getMessagesByCid(cid, fetchDraft: false); + expect(withoutDraft.first.draft, isNull); + }); + + test( + 'getMessagesByCid hydrates quoted message with its own reactions ' + 'and poll', () async { + await _seedChannel(cid); + + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + const pollId = 'poll-on-quoted'; + const quotedMessageId = 'msg-quoted'; + const quotingMessageId = 'msg-quoting'; + + final poll = Poll( + id: pollId, + name: 'Quoted poll', + options: const [ + PollOption(id: 'q-opt-a', text: 'A'), + PollOption(id: 'q-opt-b', text: 'B'), + ], + createdBy: dbUser, + createdById: dbUser.id, + ); + await database.pollDao.updatePolls([poll]); + + final baseTime = DateTime.now(); + await messageDao.updateMessages(cid, [ + Message( + id: quotedMessageId, + type: 'regular', + user: dbUser, + text: 'first', + createdAt: baseTime, + pollId: pollId, + ), + Message( + id: quotingMessageId, + type: 'regular', + user: dbUser, + text: 'second', + createdAt: baseTime.add(const Duration(seconds: 1)), + quotedMessageId: quotedMessageId, + ), + ]); + + await database.reactionDao.updateReactions([ + Reaction( + type: 'like', + messageId: quotedMessageId, + user: dbUser, + createdAt: baseTime, + ), + ]); + + final fetched = await messageDao.getMessagesByCid(cid); + final quoting = fetched.firstWhere((m) => m.id == quotingMessageId); + expect(quoting.quotedMessage, isNotNull); + expect(quoting.quotedMessage!.id, quotedMessageId); + expect(quoting.quotedMessage!.latestReactions, hasLength(1)); + expect(quoting.quotedMessage!.ownReactions, hasLength(1)); + expect(quoting.quotedMessage!.poll, isNotNull); + expect(quoting.quotedMessage!.poll!.id, pollId); + }); + + test('getMessagesByCid resolves a depth-2 quote chain', () async { + await _seedChannel(cid); + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + final baseTime = DateTime.now(); + await messageDao.updateMessages(cid, [ + Message( + id: 'C', + type: 'regular', + user: dbUser, + text: 'root', + createdAt: baseTime, + ), + Message( + id: 'B', + type: 'regular', + user: dbUser, + text: 'mid', + createdAt: baseTime.add(const Duration(seconds: 1)), + quotedMessageId: 'C', + ), + Message( + id: 'A', + type: 'regular', + user: dbUser, + text: 'top', + createdAt: baseTime.add(const Duration(seconds: 2)), + quotedMessageId: 'B', + ), + ]); + + final fetched = await messageDao.getMessagesByCid(cid); + final top = fetched.firstWhere((m) => m.id == 'A'); + expect(top.quotedMessage?.id, 'B'); + expect(top.quotedMessage?.quotedMessage?.id, 'C'); + }); + + test('getMessagesByCid hydrates reactions under pagination', () async { + await _seedChannel(cid); + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + final baseTime = DateTime.now(); + final messages = List.generate( + 30, + (i) => Message( + id: 'p-msg-$i', + type: 'regular', + user: dbUser, + text: 'msg $i', + createdAt: baseTime.add(Duration(seconds: i)), + ), + ); + await messageDao.updateMessages(cid, messages); + + // 2 reactions per message; surviving rows after pagination must still + // carry their full reaction set. + final reactions = [ + for (final m in messages) ...[ + Reaction( + type: 'r1', + messageId: m.id, + user: dbUser, + createdAt: m.createdAt, + ), + Reaction( + type: 'r2', + messageId: m.id, + user: dbUser, + createdAt: m.createdAt.add(const Duration(milliseconds: 1)), + ), + ], + ]; + await database.reactionDao.updateReactions(reactions); + + final page = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 10, + lessThan: 'p-msg-25', + ), + ); + expect(page, hasLength(10)); + for (final m in page) { + expect(m.latestReactions, hasLength(2)); + expect(m.ownReactions, hasLength(2)); + expect(m.latestReactions!.every((r) => r.messageId == m.id), isTrue); + } + }); + }); + tearDown(() async { await database.disconnect(); }); diff --git a/packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart index 0bd5166e52..5dd37dbf8a 100644 --- a/packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart @@ -426,6 +426,331 @@ void main() { ); }); + // Mirror of the `message_dao_test.dart` "hydration" group, scoped to the + // pinned-messages table + `pinnedMessageReactionDao`. Locks per-row + // hydration before the upcoming batched-hydration refactor. + group('hydration', () { + const cid = 'test:PinnedHydration'; + + Future _seedChannel(String channelCid) async { + await database.channelDao.updateChannels([ChannelModel(cid: channelCid)]); + } + + test('getMessageById hydrates multiple latest and own reactions', () async { + const messageId = 'pmsg-multi-reactions'; + await _seedChannel(cid); + + final dbUser = User(id: 'testUserId'); + final otherUser = User(id: 'otherUser'); + await database.userDao.updateUsers([dbUser, otherUser]); + + await pinnedMessageDao.updateMessages(cid, [ + Message( + id: messageId, + type: 'regular', + user: dbUser, + text: 'Hello', + createdAt: DateTime.now(), + ), + ]); + + await database.pinnedMessageReactionDao.updateReactions([ + Reaction( + type: 'like', + messageId: messageId, + user: dbUser, + createdAt: DateTime.now(), + ), + Reaction( + type: 'love', + messageId: messageId, + user: dbUser, + createdAt: DateTime.now().add(const Duration(seconds: 1)), + ), + Reaction( + type: 'wow', + messageId: messageId, + user: otherUser, + createdAt: DateTime.now().add(const Duration(seconds: 2)), + ), + ]); + + final fetched = await pinnedMessageDao.getMessageById(messageId); + expect(fetched, isNotNull); + expect(fetched!.latestReactions, hasLength(3)); + expect(fetched.ownReactions, hasLength(2)); + expect( + fetched.ownReactions!.every((r) => r.user?.id == dbUser.id), + isTrue, + ); + }); + + test('getMessagesByCid hydrates reactions per row independently', () async { + await _seedChannel(cid); + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + final baseTime = DateTime.now(); + final messages = List.generate( + 5, + (i) => Message( + id: 'pmsg-iso-$i', + type: 'regular', + user: dbUser, + text: 'Hello $i', + createdAt: baseTime.add(Duration(seconds: i)), + ), + ); + await pinnedMessageDao.updateMessages(cid, messages); + + final reactions = [ + for (var i = 0; i < messages.length; i++) ...[ + Reaction( + type: 'like-$i', + messageId: messages[i].id, + user: dbUser, + createdAt: baseTime.add(Duration(seconds: i)), + ), + Reaction( + type: 'love-$i', + messageId: messages[i].id, + user: dbUser, + createdAt: baseTime.add(Duration(seconds: i, milliseconds: 1)), + ), + ], + ]; + await database.pinnedMessageReactionDao.updateReactions(reactions); + + final fetched = await pinnedMessageDao.getMessagesByCid(cid); + expect(fetched, hasLength(5)); + for (final m in fetched) { + expect(m.latestReactions, hasLength(2)); + expect( + m.latestReactions!.map((r) => r.type).toSet(), + equals({ + 'like-${m.id.split('-').last}', + 'love-${m.id.split('-').last}', + }), + ); + } + }); + + test('getMessagesByCid hydrates poll with own + other-user votes', + () async { + const messageId = 'pmsg-with-poll'; + const pollId = 'ppoll-mixed'; + await _seedChannel(cid); + + final dbUser = User(id: 'testUserId'); + final otherUser = User(id: 'otherUser'); + await database.userDao.updateUsers([dbUser, otherUser]); + + const optionA = PollOption(id: 'p-opt-a', text: 'A'); + const optionB = PollOption(id: 'p-opt-b', text: 'B'); + + await database.pollDao.updatePolls([ + Poll( + id: pollId, + name: 'Pick one', + options: const [optionA, optionB], + createdBy: dbUser, + createdById: dbUser.id, + ), + ]); + + await pinnedMessageDao.updateMessages(cid, [ + Message( + id: messageId, + type: 'regular', + user: dbUser, + text: 'Vote please', + createdAt: DateTime.now(), + pollId: pollId, + ), + ]); + + await database.pollVoteDao.updatePollVotes([ + PollVote( + id: 'pv1', + pollId: pollId, + userId: dbUser.id, + user: dbUser, + optionId: optionA.id, + createdAt: DateTime.now(), + ), + PollVote( + id: 'pv2', + pollId: pollId, + userId: otherUser.id, + user: otherUser, + optionId: optionB.id, + createdAt: DateTime.now().add(const Duration(seconds: 1)), + ), + PollVote( + id: 'pa1', + pollId: pollId, + userId: dbUser.id, + user: dbUser, + answerText: 'because', + createdAt: DateTime.now().add(const Duration(seconds: 2)), + ), + ]); + + final fetched = await pinnedMessageDao.getMessagesByCid(cid); + expect(fetched, hasLength(1)); + final hydratedPoll = fetched.first.poll; + expect(hydratedPoll, isNotNull); + expect(hydratedPoll!.id, pollId); + expect(hydratedPoll.latestAnswers, hasLength(1)); + // 1 own vote + 1 own answer = 2. + expect(hydratedPoll.ownVotesAndAnswers, hasLength(2)); + }); + + test( + 'getMessagesByCid hydrates thread draft when fetchDraft=true; ' + 'null when false', () async { + await _seedChannel(cid); + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + const parentId = 'pmsg-with-draft'; + final parentMessage = Message( + id: parentId, + type: 'regular', + user: dbUser, + text: 'msg', + createdAt: DateTime.now(), + ); + // Pin the message and ALSO insert it into the main `messages` table: + // `DraftMessages.parentId` is FK-referenced against `Messages.id`, not + // `PinnedMessages.id`, so a thread draft needs the row in both places. + await pinnedMessageDao.updateMessages(cid, [parentMessage]); + await database.messageDao.updateMessages(cid, [parentMessage]); + + await database.draftMessageDao.updateDraftMessages([ + Draft( + channelCid: cid, + parentId: parentId, + createdAt: DateTime.now(), + message: DraftMessage( + id: 'pdraft-0', + text: 'unsent', + parentId: parentId, + ), + ), + ]); + + final withDraft = await pinnedMessageDao.getMessagesByCid(cid); + expect(withDraft.first.draft, isNotNull); + expect(withDraft.first.draft!.parentId, parentId); + + final withoutDraft = + await pinnedMessageDao.getMessagesByCid(cid, fetchDraft: false); + expect(withoutDraft.first.draft, isNull); + }); + + test( + 'getMessagesByCid hydrates quoted pinned message with its own ' + 'reactions and poll', () async { + await _seedChannel(cid); + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + const pollId = 'ppoll-on-quoted'; + const quotedMessageId = 'pmsg-quoted'; + const quotingMessageId = 'pmsg-quoting'; + + await database.pollDao.updatePolls([ + Poll( + id: pollId, + name: 'Quoted poll', + options: const [ + PollOption(id: 'pq-opt-a', text: 'A'), + PollOption(id: 'pq-opt-b', text: 'B'), + ], + createdBy: dbUser, + createdById: dbUser.id, + ), + ]); + + final baseTime = DateTime.now(); + await pinnedMessageDao.updateMessages(cid, [ + Message( + id: quotedMessageId, + type: 'regular', + user: dbUser, + text: 'first', + createdAt: baseTime, + pollId: pollId, + ), + Message( + id: quotingMessageId, + type: 'regular', + user: dbUser, + text: 'second', + createdAt: baseTime.add(const Duration(seconds: 1)), + quotedMessageId: quotedMessageId, + ), + ]); + + await database.pinnedMessageReactionDao.updateReactions([ + Reaction( + type: 'like', + messageId: quotedMessageId, + user: dbUser, + createdAt: baseTime, + ), + ]); + + final fetched = await pinnedMessageDao.getMessagesByCid(cid); + final quoting = fetched.firstWhere((m) => m.id == quotingMessageId); + expect(quoting.quotedMessage, isNotNull); + expect(quoting.quotedMessage!.id, quotedMessageId); + expect(quoting.quotedMessage!.latestReactions, hasLength(1)); + expect(quoting.quotedMessage!.ownReactions, hasLength(1)); + expect(quoting.quotedMessage!.poll, isNotNull); + expect(quoting.quotedMessage!.poll!.id, pollId); + }); + + test('getMessagesByCid resolves a depth-2 quote chain', () async { + await _seedChannel(cid); + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + final baseTime = DateTime.now(); + await pinnedMessageDao.updateMessages(cid, [ + Message( + id: 'pC', + type: 'regular', + user: dbUser, + text: 'root', + createdAt: baseTime, + ), + Message( + id: 'pB', + type: 'regular', + user: dbUser, + text: 'mid', + createdAt: baseTime.add(const Duration(seconds: 1)), + quotedMessageId: 'pC', + ), + Message( + id: 'pA', + type: 'regular', + user: dbUser, + text: 'top', + createdAt: baseTime.add(const Duration(seconds: 2)), + quotedMessageId: 'pB', + ), + ]); + + final fetched = await pinnedMessageDao.getMessagesByCid(cid); + final top = fetched.firstWhere((m) => m.id == 'pA'); + expect(top.quotedMessage?.id, 'pB'); + expect(top.quotedMessage?.quotedMessage?.id, 'pC'); + }); + }); + tearDown(() async { await database.disconnect(); }); diff --git a/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart index 640399ef87..80fe9a6552 100644 --- a/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart @@ -150,6 +150,19 @@ void main() { ); }); + test( + 'getReactions returns empty for a message id with no reactions, ' + 'even when reactions exist for other messages', () async { + const messageWithReactions = 'pmsg-A'; + const messageWithoutReactions = 'pmsg-B'; + + await _prepareReactionData(messageWithReactions); + + final fetched = + await pinnedMessageReactionDao.getReactions(messageWithoutReactions); + expect(fetched, isEmpty); + }); + group('deleteReactionsByMessageIds', () { const messageId1 = 'testMessageId1'; const messageId2 = 'testMessageId2'; diff --git a/packages/stream_chat_persistence/test/src/dao/poll_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/poll_dao_test.dart index 3f0b551670..8bc7503e66 100644 --- a/packages/stream_chat_persistence/test/src/dao/poll_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/poll_dao_test.dart @@ -126,6 +126,82 @@ void main() { expect(fetchedPoll!.id, pollToFetch.id); }); + test('getPollById returns null for an unknown id', () async { + final poll = await pollDao.getPollById('does-not-exist'); + expect(poll, isNull); + }); + + test( + 'getPollById hydrates ownVotesAndAnswers + latestVotesByOption + ' + 'latestAnswers from mixed-user/mixed-option votes', () async { + const pollId = 'poll-mixed'; + const optionA = PollOption(id: 'opt-a', text: 'A'); + const optionB = PollOption(id: 'opt-b', text: 'B'); + + // The test database is built with `testUserId`, so a vote with that + // userId is "own". Other userIds end up only in the latest-by-option + // / latest-answers buckets. + final dbUser = User(id: 'testUserId'); + final otherUser = User(id: 'otherUser'); + await database.userDao.updateUsers([dbUser, otherUser]); + await pollDao.updatePolls([ + Poll( + id: pollId, + name: 'Pick one', + options: const [optionA, optionB], + createdBy: dbUser, + createdById: dbUser.id, + ), + ]); + await database.pollVoteDao.updatePollVotes([ + PollVote( + id: 'own-vote-a', + pollId: pollId, + userId: dbUser.id, + user: dbUser, + optionId: optionA.id, + createdAt: DateTime.now(), + ), + PollVote( + id: 'other-vote-b', + pollId: pollId, + userId: otherUser.id, + user: otherUser, + optionId: optionB.id, + createdAt: DateTime.now().add(const Duration(seconds: 1)), + ), + PollVote( + id: 'own-answer', + pollId: pollId, + userId: dbUser.id, + user: dbUser, + answerText: 'because', + createdAt: DateTime.now().add(const Duration(seconds: 2)), + ), + PollVote( + id: 'other-answer', + pollId: pollId, + userId: otherUser.id, + user: otherUser, + answerText: 'just because', + createdAt: DateTime.now().add(const Duration(seconds: 3)), + ), + ]); + + final fetched = await pollDao.getPollById(pollId); + + expect(fetched, isNotNull); + expect(fetched!.latestVotesByOption[optionA.id], hasLength(1)); + expect(fetched.latestVotesByOption[optionB.id], hasLength(1)); + expect(fetched.latestAnswers, hasLength(2)); + // 1 own vote + 1 own answer => 2. + expect(fetched.ownVotesAndAnswers, hasLength(2)); + expect( + fetched.ownVotesAndAnswers.every((v) => v.userId == dbUser.id), + isTrue, + ); + }); + test('deletePollsByIds', () async { // Preparing test data final insertedPolls = await _preparePollData(); diff --git a/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart index 38c0ff7d58..4c358722f8 100644 --- a/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart @@ -147,6 +147,83 @@ void main() { ); }); + test('getReactionsForMessages chunks transparently when given >900 ids', + () async { + // 1,200 ids exceeds the historical SQLITE_MAX_VARIABLE_NUMBER cap (999) + // for a single `WHERE messageId IN (?, ?, ...)` statement — the helper + // must run the SELECT in chunks and merge. + const cid = 'test:ChunkedReactions'; + const total = 1200; + + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + await database.channelDao.updateChannels([ChannelModel(cid: cid)]); + + final baseTime = DateTime.now(); + final messages = List.generate( + total, + (i) => Message( + id: 'cmsg-$i', + type: 'regular', + user: dbUser, + text: 'Hello $i', + createdAt: baseTime.add(Duration(seconds: i)), + ), + ); + await database.messageDao.updateMessages(cid, messages); + + // Seed 1 reaction on every odd-indexed message (600 total) so we can + // verify both that the chunked query returns rows AND that ids without + // reactions still appear in the dense map with an empty list. + final reactions = [ + for (var i = 0; i < total; i++) + if (i.isOdd) + Reaction( + type: 'like', + messageId: messages[i].id, + user: dbUser, + createdAt: baseTime.add(Duration(seconds: i)), + ), + ]; + await reactionDao.updateReactions(reactions); + + final ids = messages.map((m) => m.id).toList(); + final grouped = await reactionDao.getReactionsForMessages(ids); + + expect(grouped, hasLength(total), + reason: 'every input id must be a key (dense-map contract)'); + var withReactions = 0; + var empty = 0; + for (var i = 0; i < total; i++) { + final list = grouped['cmsg-$i']; + expect(list, isNotNull); + if (i.isOdd) { + expect(list, hasLength(1)); + expect(list!.first.messageId, 'cmsg-$i'); + withReactions++; + } else { + expect(list, isEmpty); + empty++; + } + } + expect(withReactions, total ~/ 2); + expect(empty, total ~/ 2); + }); + + test( + 'getReactions returns empty for a message id with no reactions, ' + 'even when reactions exist for other messages', () async { + // Locks per-id isolation: the upcoming batched `WHERE messageId IN (...)` + // path must not leak rows across ids when only one is queried. + const messageWithReactions = 'msg-A'; + const messageWithoutReactions = 'msg-B'; + + await _prepareReactionData(messageWithReactions); + + final fetched = await reactionDao.getReactions(messageWithoutReactions); + expect(fetched, isEmpty); + }); + group('deleteReactionsByMessageIds', () { const messageId1 = 'testMessageId1'; const messageId2 = 'testMessageId2'; From 9553c2d43cddbcc04c778de38d13820d7c22eba0 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Fri, 22 May 2026 15:26:43 +0200 Subject: [PATCH 08/17] refactor(dao): add tests for new methods --- .../src/dao/pinned_message_reaction_dao.dart | 11 ++- .../lib/src/dao/reaction_dao.dart | 11 ++- .../test/src/dao/draft_message_dao_test.dart | 97 +++++++++++++++++++ .../test/src/dao/poll_dao_test.dart | 83 ++++++++++++++++ .../test/src/dao/poll_vote_dao_test.dart | 43 ++++++++ .../test/src/dao/reaction_dao_test.dart | 71 ++++++++++++++ 6 files changed, 312 insertions(+), 4 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart b/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart index c319809fd0..27f5f4439e 100644 --- a/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart @@ -16,7 +16,11 @@ class PinnedMessageReactionDao extends DatabaseAccessor PinnedMessageReactionDao(super.db); /// Returns all the reactions of a particular message by matching - /// [Reactions.messageId] with [messageId] + /// [Reactions.messageId] with [messageId]. + /// + /// Not used in production — `PinnedMessageDao` hydrates via the batched + /// [getReactionsForMessages]. Kept for convenience (tests + ad-hoc + /// single-id lookups). Future> getReactions(String messageId) { final where = pinnedMessageReactions.messageId.equals(messageId); return _selectReactions(where); @@ -25,7 +29,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor /// Returns all the reactions of a particular message /// added by a particular user by matching /// [Reactions.messageId] with [messageId] and - /// [Reactions.userId] with [userId] + /// [Reactions.userId] with [userId]. + /// + /// Not used in production — `PinnedMessageDao` hydrates via the batched + /// [getReactionsForMessagesByUserId]. Kept for convenience. Future> getReactionsByUserId( String messageId, String userId, diff --git a/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart b/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart index 8a8cd2bf57..43c41a8bea 100644 --- a/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart @@ -16,7 +16,11 @@ class ReactionDao extends DatabaseAccessor ReactionDao(super.db); /// Returns all the reactions of a particular message by matching - /// [Reactions.messageId] with [messageId] + /// [Reactions.messageId] with [messageId]. + /// + /// Not used in production — `MessageDao` hydrates via the batched + /// [getReactionsForMessages]. Kept for convenience (tests + ad-hoc + /// single-id lookups). Future> getReactions(String messageId) { final where = reactions.messageId.equals(messageId); return _selectReactions(where); @@ -25,7 +29,10 @@ class ReactionDao extends DatabaseAccessor /// Returns all the reactions of a particular message /// added by a particular user by matching /// [Reactions.messageId] with [messageId] and - /// [Reactions.userId] with [userId] + /// [Reactions.userId] with [userId]. + /// + /// Not used in production — `MessageDao` hydrates via the batched + /// [getReactionsForMessagesByUserId]. Kept for convenience. Future> getReactionsByUserId( String messageId, String userId, diff --git a/packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart index 9de6fc71c8..871521472d 100644 --- a/packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart @@ -549,4 +549,101 @@ void main() { }, ); }); + + group('getDraftMessagesByParentIds', () { + test('returns empty map for empty input ids', () async { + final result = await draftMessageDao + .getDraftMessagesByParentIds('any-cid', const []); + expect(result, isEmpty); + }); + + test( + 'returns the thread draft per parent id within the given channel; ' + 'parents without a thread draft (or drafts in other channels) map ' + 'to null', () async { + const cidA = 'test:cidA'; + const cidB = 'test:cidB'; + const parentWithDraft = 'parent-with-draft'; + const parentWithoutDraft = 'parent-without-draft'; + const parentInOtherChannel = 'parent-in-other-channel'; + const parentUnknown = 'parent-unknown'; + + final user = User(id: 'testUserId'); + await database.userDao.updateUsers([user]); + await database.channelDao.updateChannels([ + ChannelModel(cid: cidA), + ChannelModel(cid: cidB), + ]); + await database.messageDao.updateMessages(cidA, [ + Message( + id: parentWithDraft, + user: user, + createdAt: DateTime.now(), + text: 'A', + ), + Message( + id: parentWithoutDraft, + user: user, + createdAt: DateTime.now(), + text: 'B', + ), + ]); + await database.messageDao.updateMessages(cidB, [ + Message( + id: parentInOtherChannel, + user: user, + createdAt: DateTime.now(), + text: 'C', + ), + ]); + // One draft in cidA on parentWithDraft, and one draft in cidB to + // confirm the cid filter excludes it from the cidA lookup. + await draftMessageDao.updateDraftMessages([ + Draft( + channelCid: cidA, + parentId: parentWithDraft, + createdAt: DateTime.now(), + message: DraftMessage( + text: 'draft in cidA', + parentId: parentWithDraft, + ), + ), + Draft( + channelCid: cidB, + parentId: parentInOtherChannel, + createdAt: DateTime.now(), + message: DraftMessage( + text: 'draft in cidB', + parentId: parentInOtherChannel, + ), + ), + ]); + + final result = await draftMessageDao.getDraftMessagesByParentIds( + cidA, + const [ + parentWithDraft, + parentWithoutDraft, + parentInOtherChannel, + parentUnknown, + ], + ); + + expect( + result.keys, + unorderedEquals([ + parentWithDraft, + parentWithoutDraft, + parentInOtherChannel, + parentUnknown, + ]), + ); + expect(result[parentWithDraft], isNotNull); + expect(result[parentWithDraft]!.parentId, parentWithDraft); + expect(result[parentWithDraft]!.message.text, 'draft in cidA'); + expect(result[parentWithoutDraft], isNull); + expect(result[parentInOtherChannel], isNull); + expect(result[parentUnknown], isNull); + }); + }); } diff --git a/packages/stream_chat_persistence/test/src/dao/poll_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/poll_dao_test.dart index 8bc7503e66..37fe19f905 100644 --- a/packages/stream_chat_persistence/test/src/dao/poll_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/poll_dao_test.dart @@ -240,6 +240,89 @@ void main() { expect(fetchedPollVotes, isEmpty); }); + group('getPollsByIds', () { + test('returns empty map for empty input ids', () async { + final result = await pollDao.getPollsByIds(const []); + expect(result, isEmpty); + }); + + test( + 'returns a poll per id with votes/answers/ownVotes grouped; ids not ' + 'in the cache map to null', () async { + const optionA = PollOption(id: 'opt-a', text: 'A'); + const optionB = PollOption(id: 'opt-b', text: 'B'); + + final dbUser = User(id: 'testUserId'); + final otherUser = User(id: 'otherUser'); + await database.userDao.updateUsers([dbUser, otherUser]); + await pollDao.updatePolls([ + Poll( + id: 'poll-1', + name: 'Pick one', + options: const [optionA, optionB], + createdBy: dbUser, + createdById: dbUser.id, + ), + Poll( + id: 'poll-2', + name: 'No votes', + options: const [optionA, optionB], + createdBy: dbUser, + createdById: dbUser.id, + ), + ]); + await database.pollVoteDao.updatePollVotes([ + PollVote( + id: 'p1-own-a', + pollId: 'poll-1', + userId: dbUser.id, + user: dbUser, + optionId: optionA.id, + createdAt: DateTime.now(), + ), + PollVote( + id: 'p1-other-b', + pollId: 'poll-1', + userId: otherUser.id, + user: otherUser, + optionId: optionB.id, + createdAt: DateTime.now(), + ), + PollVote( + id: 'p1-own-answer', + pollId: 'poll-1', + userId: dbUser.id, + user: dbUser, + answerText: 'because', + createdAt: DateTime.now(), + ), + ]); + + final result = await pollDao + .getPollsByIds(const ['poll-1', 'poll-2', 'poll-missing']); + + expect( + result.keys, unorderedEquals(['poll-1', 'poll-2', 'poll-missing'])); + expect(result['poll-missing'], isNull); + + final poll1 = result['poll-1']!; + expect(poll1.latestVotesByOption[optionA.id], hasLength(1)); + expect(poll1.latestVotesByOption[optionB.id], hasLength(1)); + expect(poll1.latestAnswers, hasLength(1)); + // 1 own vote + 1 own answer = 2. + expect(poll1.ownVotesAndAnswers, hasLength(2)); + expect( + poll1.ownVotesAndAnswers.every((v) => v.userId == dbUser.id), + isTrue, + ); + + final poll2 = result['poll-2']!; + expect(poll2.latestVotesByOption, isEmpty); + expect(poll2.latestAnswers, isEmpty); + expect(poll2.ownVotesAndAnswers, isEmpty); + }); + }); + tearDown(() async { await database.disconnect(); }); diff --git a/packages/stream_chat_persistence/test/src/dao/poll_vote_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/poll_vote_dao_test.dart index 093aac84bf..75c8ac6f12 100644 --- a/packages/stream_chat_persistence/test/src/dao/poll_vote_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/poll_vote_dao_test.dart @@ -171,6 +171,49 @@ void main() { }); }); + group('getPollVotesForPolls', () { + test('returns empty map for empty input ids', () async { + final result = await pollVoteDao.getPollVotesForPolls(const []); + expect(result, isEmpty); + }); + + test( + 'returns votes grouped by poll id; polls with no votes (or unknown ' + 'ids) map to an empty list', () async { + const pollWithVotes = 'poll-with-votes'; + const pollWithoutVotes = 'poll-without-votes'; + const pollUnknown = 'poll-unknown'; + + // _preparePollVoteData seeds the poll plus 3 votes per option (9 + // total) and inserts the necessary users. + final seededVotes = await _preparePollVoteData(pollWithVotes); + // Seed a second poll with no votes — just to confirm cross-poll + // isolation (votes from pollWithVotes must not leak into its bucket). + await database.pollDao.updatePolls([ + Poll( + id: pollWithoutVotes, + name: 'No votes here', + options: const [PollOption(id: 'opt', text: 'X')], + createdById: 'user-0', + ), + ]); + + final result = await pollVoteDao.getPollVotesForPolls( + const [pollWithVotes, pollWithoutVotes, pollUnknown], + ); + + expect(result.keys, + unorderedEquals([pollWithVotes, pollWithoutVotes, pollUnknown])); + expect(result[pollWithVotes], hasLength(seededVotes.length)); + expect( + result[pollWithVotes]!.every((v) => v.pollId == pollWithVotes), + isTrue, + ); + expect(result[pollWithoutVotes], isEmpty); + expect(result[pollUnknown], isEmpty); + }); + }); + tearDown(() async { await database.disconnect(); }); diff --git a/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart index 4c358722f8..debef2a237 100644 --- a/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart @@ -224,6 +224,77 @@ void main() { expect(fetched, isEmpty); }); + group('getReactionsForMessagesByUserId', () { + test('returns empty map for empty input ids', () async { + final result = await reactionDao + .getReactionsForMessagesByUserId(const [], 'someUser'); + expect(result, isEmpty); + }); + + test( + 'returns the given user\'s reactions per message id; message ids ' + 'with no reactions from that user map to an empty list', () async { + const cid = 'test:Cid'; + const targetUser = 'targetUser'; + const otherUser = 'otherUser'; + const msgWithOwn = 'msg-with-own'; + const msgWithoutOwn = 'msg-without-own'; + const msgUnknown = 'msg-unknown'; + + final users = [User(id: targetUser), User(id: otherUser)]; + await database.userDao.updateUsers(users); + await database.channelDao.updateChannels([ChannelModel(cid: cid)]); + await database.messageDao.updateMessages(cid, [ + Message( + id: msgWithOwn, + user: users.first, + createdAt: DateTime.now(), + text: 'a', + ), + Message( + id: msgWithoutOwn, + user: users.first, + createdAt: DateTime.now(), + text: 'b', + ), + ]); + // msgWithOwn: 1 own + 1 other; msgWithoutOwn: only other-user reaction. + await reactionDao.updateReactions([ + Reaction( + type: 'like', + messageId: msgWithOwn, + userId: targetUser, + createdAt: DateTime.now(), + ), + Reaction( + type: 'wow', + messageId: msgWithOwn, + userId: otherUser, + createdAt: DateTime.now(), + ), + Reaction( + type: 'love', + messageId: msgWithoutOwn, + userId: otherUser, + createdAt: DateTime.now(), + ), + ]); + + final result = await reactionDao.getReactionsForMessagesByUserId( + const [msgWithOwn, msgWithoutOwn, msgUnknown], + targetUser, + ); + + expect(result.keys, + unorderedEquals([msgWithOwn, msgWithoutOwn, msgUnknown])); + expect(result[msgWithOwn], hasLength(1)); + expect(result[msgWithOwn]!.single.userId, targetUser); + expect(result[msgWithOwn]!.single.type, 'like'); + expect(result[msgWithoutOwn], isEmpty); + expect(result[msgUnknown], isEmpty); + }); + }); + group('deleteReactionsByMessageIds', () { const messageId1 = 'testMessageId1'; const messageId2 = 'testMessageId2'; From b580ccdc473fc06e537b622915aa7600e4ece87d Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Mon, 25 May 2026 11:33:46 +0200 Subject: [PATCH 09/17] refactor(dao): enhance message pagination logic to support inclusive cursors --- packages/stream_chat_persistence/CHANGELOG.md | 9 ++ .../lib/src/dao/message_dao.dart | 81 ++++++++++++---- .../test/src/dao/message_dao_test.dart | 96 ++++++++++++++++++- 3 files changed, 162 insertions(+), 24 deletions(-) diff --git a/packages/stream_chat_persistence/CHANGELOG.md b/packages/stream_chat_persistence/CHANGELOG.md index 147de6e493..74113f71f4 100644 --- a/packages/stream_chat_persistence/CHANGELOG.md +++ b/packages/stream_chat_persistence/CHANGELOG.md @@ -6,6 +6,15 @@ - Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. - Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. +✅ Added + +- `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`. + +🐞 Fixed + +- `MessageDao.getMessagesByCid` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour. +- `MessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail — mirroring how `lessThan` already returned the messages immediately before the pivot. + ## 9.24.0 diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index 2aed79cda5..f1396c786c 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -169,14 +169,49 @@ class MessageDao extends DatabaseAccessor bool fetchDraft = true, PaginationParams? messagePagination, }) async { - final lessThanCutoff = await switch (messagePagination?.lessThan) { - final id? => _lookupMessageCreatedAt(id), - _ => null, - }; - final greaterThanCutoff = await switch (messagePagination?.greaterThan) { - final id? => _lookupMessageCreatedAt(id), - _ => null, - }; + final ( + lessThanCutoff, + lessThanOrEqualCutoff, + greaterThanCutoff, + greaterThanOrEqualCutoff, + ) = await ( + switch (messagePagination?.lessThan) { + final id? => _lookupMessageCreatedAt(id), + _ => Future.value(), + }, + switch (messagePagination?.lessThanOrEqual) { + final id? => _lookupMessageCreatedAt(id), + _ => Future.value(), + }, + switch (messagePagination?.greaterThan) { + final id? => _lookupMessageCreatedAt(id), + _ => Future.value(), + }, + switch (messagePagination?.greaterThanOrEqual) { + final id? => _lookupMessageCreatedAt(id), + _ => Future.value(), + }, + ).wait; + + // When the caller is paginating forward (greaterThan / greaterThanOrEqual + // only), order ASC so the SQL `LIMIT` retains the N messages immediately + // AFTER the cursor. Otherwise order DESC so `LIMIT` retains the N most + // recent (closest to a `lessThan` cursor, or the channel tail when no + // cursor is set). The final result is always reshaped to ASC for display. + final isForwardPagination = + (greaterThanCutoff != null || greaterThanOrEqualCutoff != null) && + lessThanCutoff == null && + lessThanOrEqualCutoff == null; + + final orderBy = isForwardPagination + ? [ + OrderingTerm.asc(messages.createdAt), + OrderingTerm.asc(messages.id), + ] + : [ + OrderingTerm.desc(messages.createdAt), + OrderingTerm.desc(messages.id), + ]; final query = select(messages).join([ leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), @@ -187,15 +222,18 @@ class MessageDao extends DatabaseAccessor ]) ..where(messages.channelCid.equals(cid)) ..where(messages.parentId.isNull() | messages.showInChannel.equals(true)) - ..orderBy([ - OrderingTerm.desc(messages.createdAt), - OrderingTerm.desc(messages.id), - ]); + ..orderBy(orderBy); if (lessThanCutoff case final t?) { query.where(messages.createdAt.isSmallerThanValue(t)); } + if (lessThanOrEqualCutoff case final t?) { + query.where(messages.createdAt.isSmallerOrEqualValue(t)); + } if (greaterThanCutoff case final t?) { + query.where(messages.createdAt.isBiggerThanValue(t)); + } + if (greaterThanOrEqualCutoff case final t?) { query.where(messages.createdAt.isBiggerOrEqualValue(t)); } @@ -203,11 +241,12 @@ class MessageDao extends DatabaseAccessor query.limit(messagePagination.limit); } - final rows = (await query.get()).reversed.toList(); - if (rows.isEmpty) return []; + final rows = await query.get(); + final orderedRows = isForwardPagination ? rows : rows.reversed.toList(); return Future.wait( - rows.map((row) => _messageFromJoinRow(row, fetchDraft: fetchDraft)), + orderedRows + .map((row) => _messageFromJoinRow(row, fetchDraft: fetchDraft)), ); } @@ -234,13 +273,17 @@ class MessageDao extends DatabaseAccessor } /// Returns the `createdAt` of the message with [id] in the local cache, - /// or `null` if the message isn't cached. Used by [getMessagesByCid] to - /// resolve `lessThan` / `greaterThan` pagination cursors into SQL-comparable - /// timestamps so the filter runs in SQL instead of after the fact in Dart. + /// or `null` if the message isn't cached or isn't visible in the channel + /// (i.e. a thread reply with `showInChannel = false`). The visibility + /// predicate mirrors the main [getMessagesByCid] query so a hidden cursor + /// behaves as a no-op, matching the pre-SQL-filter behaviour. Future _lookupMessageCreatedAt(String id) { return (selectOnly(messages) ..addColumns([messages.createdAt]) - ..where(messages.id.equals(id))) + ..where(messages.id.equals(id)) + ..where( + messages.parentId.isNull() | messages.showInChannel.equals(true), + )) .map((row) => row.read(messages.createdAt)) .getSingleOrNull(); } diff --git a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart index c4f2af1518..7b8d69a05c 100644 --- a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart @@ -420,7 +420,8 @@ void main() { expect(fetchedMessages.last.id, 'testMessageId${cid}24'); }); - test('greaterThan only trims messages from the start', () async { + test('greaterThan only trims messages from the start (exclusive)', + () async { await _prepareTestData(cid, count: 30); final fetchedMessages = await messageDao.getMessagesByCid( @@ -431,8 +432,8 @@ void main() { ), ); - expect(fetchedMessages.length, 25); - expect(fetchedMessages.first.id, 'testMessageId${cid}5'); + expect(fetchedMessages.length, 24); + expect(fetchedMessages.first.id, 'testMessageId${cid}6'); expect(fetchedMessages.last.id, 'testMessageId${cid}29'); }); @@ -481,6 +482,27 @@ void main() { expect(fetchedMessages.last.id, 'testMessageId${cid}29'); }); + test('thread-reply id as cursor is a no-op (not visible in channel)', + () async { + // `_prepareTestData` inserts thread replies with `parentId` set and + // `showInChannel` left null — i.e. not visible in the channel query. + // Passing such an id as a cursor must resolve to a no-op so the main + // query falls back to returning the full channel slice. + await _prepareTestData(cid, count: 30, threads: true); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'testThreadMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 30); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + test('default PaginationParams() applies implicit limit of 10', () async { await _prepareTestData(cid, count: 30); @@ -509,7 +531,7 @@ void main() { expect(fetchedMessages.last.id, 'testMessageId${cid}24'); }); - test('default limit + greaterThan returns last 10 of filtered set', + test('default limit + greaterThan returns first 10 after the pivot', () async { await _prepareTestData(cid, count: 30); @@ -521,9 +543,73 @@ void main() { ); expect(fetchedMessages.length, 10); - expect(fetchedMessages.first.id, 'testMessageId${cid}20'); + expect(fetchedMessages.first.id, 'testMessageId${cid}6'); + expect(fetchedMessages.last.id, 'testMessageId${cid}15'); + }); + + test('lessThanOrEqual is inclusive of the pivot', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThanOrEqual: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 26); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}25'); + }); + + test('greaterThanOrEqual is inclusive of the pivot', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThanOrEqual: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 25); + expect(fetchedMessages.first.id, 'testMessageId${cid}5'); expect(fetchedMessages.last.id, 'testMessageId${cid}29'); }); + + test('default limit + lessThanOrEqual returns the pivot and 9 before', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + lessThanOrEqual: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}16'); + expect(fetchedMessages.last.id, 'testMessageId${cid}25'); + }); + + test('default limit + greaterThanOrEqual returns the pivot and 9 after', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + greaterThanOrEqual: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}5'); + expect(fetchedMessages.last.id, 'testMessageId${cid}14'); + }); }); test('updateMessages', () async { From 52f0c265685f52b0d75982eccd183ef7d80e9a92 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Mon, 25 May 2026 11:35:05 +0200 Subject: [PATCH 10/17] refactor(dao): Update docs --- packages/stream_chat_persistence/lib/src/dao/message_dao.dart | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index f1396c786c..6d53bdf171 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -274,9 +274,7 @@ class MessageDao extends DatabaseAccessor /// Returns the `createdAt` of the message with [id] in the local cache, /// or `null` if the message isn't cached or isn't visible in the channel - /// (i.e. a thread reply with `showInChannel = false`). The visibility - /// predicate mirrors the main [getMessagesByCid] query so a hidden cursor - /// behaves as a no-op, matching the pre-SQL-filter behaviour. + /// (i.e. a thread reply with `showInChannel = false`). Future _lookupMessageCreatedAt(String id) { return (selectOnly(messages) ..addColumns([messages.createdAt]) From 0dfd4fe7a485f4805868e9150038f079828b6d86 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Mon, 25 May 2026 12:24:59 +0200 Subject: [PATCH 11/17] refactor(dao): Remove legacy method --- .../lib/src/dao/message_dao.dart | 63 ------------------- 1 file changed, 63 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index 7912b1adad..c6b925b1c4 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -1,7 +1,4 @@ -import 'dart:math'; - import 'package:drift/drift.dart'; -import 'package:flutter/foundation.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; import 'package:stream_chat_persistence/src/entity/messages.dart'; @@ -286,66 +283,6 @@ class MessageDao extends DatabaseAccessor return msgList; } - /// Pre-SQL-pushdown reference implementation of [getMessagesByCid]. Fetches - /// every cached message for the channel, hydrates each row, then trims the - /// result in Dart. Kept only as the head-to-head baseline for the - /// `get_messages_by_cid_bench_test.dart` benchmark — remove once we no - /// longer need behavioral parity proof. - @visibleForTesting - Future> getMessagesByCidLegacy( - String cid, { - bool fetchDraft = true, - PaginationParams? messagePagination, - }) async { - final query = select(messages).join([ - leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), - leftOuterJoin( - _pinnedByUsers, - messages.pinnedByUserId.equalsExp(_pinnedByUsers.id), - ), - ]) - ..where(messages.channelCid.equals(cid)) - ..where(messages.parentId.isNull() | messages.showInChannel.equals(true)) - ..orderBy([OrderingTerm.asc(messages.createdAt)]); - - final result = await query.get(); - if (result.isEmpty) return []; - - final msgList = await Future.wait( - result.map( - (row) => _messageFromJoinRow( - row, - fetchDraft: fetchDraft, - ), - ), - ); - - if (msgList.isNotEmpty) { - if (messagePagination?.lessThan != null) { - final lessThanIndex = msgList.indexWhere( - (m) => m.id == messagePagination!.lessThan, - ); - if (lessThanIndex != -1) { - msgList.removeRange(lessThanIndex, msgList.length); - } - } - if (messagePagination?.greaterThan != null) { - final greaterThanIndex = msgList.indexWhere( - (m) => m.id == messagePagination!.greaterThan, - ); - if (greaterThanIndex != -1) { - msgList.removeRange(0, greaterThanIndex); - } - } - if (messagePagination?.limit != null) { - return msgList - .skip(max(0, msgList.length - messagePagination!.limit)) - .toList(); - } - } - return msgList; - } - /// Returns all the messages of a channel by matching /// [Messages.channelCid] with [parentId] Future> getMessagesByCid( From 80719e6298f275cad36233efc21091cc499b7a39 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Mon, 25 May 2026 12:29:19 +0200 Subject: [PATCH 12/17] refactor(dao): Remove legacy method --- .../lib/src/dao/pinned_message_dao.dart | 61 ------------------- 1 file changed, 61 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart index f556c68b71..e2dafb552d 100644 --- a/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart @@ -1,10 +1,8 @@ import 'package:drift/drift.dart'; -import 'package:flutter/foundation.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; import 'package:stream_chat_persistence/src/entity/pinned_messages.dart'; import 'package:stream_chat_persistence/src/entity/users.dart'; - import 'package:stream_chat_persistence/src/mapper/mapper.dart'; part 'pinned_message_dao.g.dart'; @@ -281,65 +279,6 @@ class PinnedMessageDao extends DatabaseAccessor return msgList; } - /// Pre-batched-hydration reference implementation of [getMessagesByCid]. - /// Same shape as the current public method but uses the per-row N+1 - /// [_messageFromJoinRow] path. Kept only as the head-to-head baseline for - /// `get_messages_by_cid_pinned_hydration_bench_test.dart` and the pinned - /// parity test — remove once we no longer need behavioral parity proof. - @visibleForTesting - Future> getMessagesByCidLegacy( - String cid, { - bool fetchDraft = true, - PaginationParams? messagePagination, - }) async { - final query = select(pinnedMessages).join([ - leftOuterJoin(_users, pinnedMessages.userId.equalsExp(_users.id)), - leftOuterJoin( - _pinnedByUsers, - pinnedMessages.pinnedByUserId.equalsExp(_pinnedByUsers.id), - ), - ]) - ..where(pinnedMessages.channelCid.equals(cid)) - ..where(pinnedMessages.parentId.isNull() | - pinnedMessages.showInChannel.equals(true)) - ..orderBy([OrderingTerm.asc(pinnedMessages.createdAt)]); - - final result = await query.get(); - if (result.isEmpty) return []; - - final msgList = await Future.wait( - result.map( - (row) => _messageFromJoinRow( - row, - fetchDraft: fetchDraft, - ), - ), - ); - - if (msgList.isNotEmpty) { - if (messagePagination?.lessThan != null) { - final lessThanIndex = msgList.indexWhere( - (m) => m.id == messagePagination!.lessThan, - ); - if (lessThanIndex != -1) { - msgList.removeRange(lessThanIndex, msgList.length); - } - } - if (messagePagination?.greaterThan != null) { - final greaterThanIndex = msgList.indexWhere( - (m) => m.id == messagePagination!.greaterThan, - ); - if (greaterThanIndex != -1) { - msgList.removeRange(0, greaterThanIndex); - } - } - if (messagePagination?.limit != null) { - return msgList.take(messagePagination!.limit).toList(); - } - } - return msgList; - } - /// Returns all the messages of a channel by matching /// [PinnedMessages.channelCid] with [parentId] Future> getMessagesByCid( From 1da0c45646b5c9ec29aa72be7585d07abca36c53 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Mon, 25 May 2026 12:41:47 +0200 Subject: [PATCH 13/17] refactor(dao): Remove legacy method --- .../lib/src/dao/message_dao.dart | 42 ------------------ .../lib/src/dao/pinned_message_dao.dart | 44 ------------------- 2 files changed, 86 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index c6b925b1c4..97fe3a3e04 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -34,48 +34,6 @@ class MessageDao extends DatabaseAccessor Future deleteMessageByCids(List cids) async => (delete(messages)..where((tbl) => tbl.channelCid.isIn(cids))).go(); - Future _messageFromJoinRow( - TypedResult rows, { - bool fetchDraft = false, - }) async { - final userEntity = rows.readTableOrNull(_users); - final pinnedByEntity = rows.readTableOrNull(_pinnedByUsers); - final msgEntity = rows.readTable(messages); - final latestReactions = await _db.reactionDao.getReactions(msgEntity.id); - final ownReactions = await _db.reactionDao.getReactionsByUserId( - msgEntity.id, - _db.userId, - ); - - final quotedMessage = await switch (msgEntity.quotedMessageId) { - final id? => getMessageById(id), - _ => null, - }; - - final poll = await switch (msgEntity.pollId) { - final id? => _db.pollDao.getPollById(id), - _ => null, - }; - - final draft = await switch (fetchDraft) { - true => _db.draftMessageDao.getDraftMessageByCid( - msgEntity.channelCid, - parentId: msgEntity.id, - ), - _ => null, - }; - - return msgEntity.toMessage( - user: userEntity?.toUser(), - pinnedBy: pinnedByEntity?.toUser(), - latestReactions: latestReactions, - ownReactions: ownReactions, - quotedMessage: quotedMessage, - poll: poll, - draft: draft, - ); - } - /// Hydrates `rows` into `Message`s using batched lookups for related /// entities. Reactions, polls, quoted messages and (optionally) drafts are /// each fetched once via a single `WHERE ... IN (?). diff --git a/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart index e2dafb552d..0c7ff2652c 100644 --- a/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart @@ -34,50 +34,6 @@ class PinnedMessageDao extends DatabaseAccessor Future deleteMessageByCids(List cids) async => (delete(pinnedMessages)..where((tbl) => tbl.channelCid.isIn(cids))).go(); - Future _messageFromJoinRow( - TypedResult rows, { - bool fetchDraft = false, - }) async { - final userEntity = rows.readTableOrNull(_users); - final pinnedByEntity = rows.readTableOrNull(_pinnedByUsers); - final msgEntity = rows.readTable(pinnedMessages); - final latestReactions = - await _db.pinnedMessageReactionDao.getReactions(msgEntity.id); - final ownReactions = - await _db.pinnedMessageReactionDao.getReactionsByUserId( - msgEntity.id, - _db.userId, - ); - - final quotedMessage = await switch (msgEntity.quotedMessageId) { - final id? => getMessageById(id), - _ => null, - }; - - final poll = await switch (msgEntity.pollId) { - final id? => _db.pollDao.getPollById(id), - _ => null, - }; - - final draft = await switch (fetchDraft) { - true => _db.draftMessageDao.getDraftMessageByCid( - msgEntity.channelCid, - parentId: msgEntity.id, - ), - _ => null, - }; - - return msgEntity.toMessage( - user: userEntity?.toUser(), - pinnedBy: pinnedByEntity?.toUser(), - latestReactions: latestReactions, - ownReactions: ownReactions, - quotedMessage: quotedMessage, - poll: poll, - draft: draft, - ); - } - /// Hydrates `rows` (from the pinned-messages table) into `Message`s using /// batched lookups for related entities. Future> _messagesFromJoinRows( From cc443dce0fecb9d52e3749172f4fbc6b02425531 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Mon, 25 May 2026 12:44:39 +0200 Subject: [PATCH 14/17] refactor(dao): Fix warnings --- .../test/src/dao/message_dao_test.dart | 11 ----------- .../test/src/dao/pinned_message_dao_test.dart | 9 --------- .../test/src/dao/reaction_dao_test.dart | 3 +-- 3 files changed, 1 insertion(+), 22 deletions(-) diff --git a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart index 70e90a6477..c9df631c50 100644 --- a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart @@ -673,7 +673,6 @@ void main() { await messageDao.updateMessages(cid, [ Message( id: messageId, - type: 'regular', user: dbUser, text: 'Hello', createdAt: DateTime.now(), @@ -727,7 +726,6 @@ void main() { 5, (i) => Message( id: 'msg-iso-$i', - type: 'regular', user: dbUser, text: 'Hello $i', createdAt: baseTime.add(Duration(seconds: i)), @@ -793,7 +791,6 @@ void main() { await messageDao.updateMessages(cid, [ Message( id: messageId, - type: 'regular', user: dbUser, text: 'Vote please', createdAt: DateTime.now(), @@ -875,7 +872,6 @@ void main() { await messageDao.updateMessages(cid, [ Message( id: parentId, - type: 'regular', user: dbUser, text: 'msg', createdAt: DateTime.now(), @@ -933,7 +929,6 @@ void main() { await messageDao.updateMessages(cid, [ Message( id: quotedMessageId, - type: 'regular', user: dbUser, text: 'first', createdAt: baseTime, @@ -941,7 +936,6 @@ void main() { ), Message( id: quotingMessageId, - type: 'regular', user: dbUser, text: 'second', createdAt: baseTime.add(const Duration(seconds: 1)), @@ -977,14 +971,12 @@ void main() { await messageDao.updateMessages(cid, [ Message( id: 'C', - type: 'regular', user: dbUser, text: 'root', createdAt: baseTime, ), Message( id: 'B', - type: 'regular', user: dbUser, text: 'mid', createdAt: baseTime.add(const Duration(seconds: 1)), @@ -992,7 +984,6 @@ void main() { ), Message( id: 'A', - type: 'regular', user: dbUser, text: 'top', createdAt: baseTime.add(const Duration(seconds: 2)), @@ -1016,7 +1007,6 @@ void main() { 30, (i) => Message( id: 'p-msg-$i', - type: 'regular', user: dbUser, text: 'msg $i', createdAt: baseTime.add(Duration(seconds: i)), @@ -1047,7 +1037,6 @@ void main() { final page = await messageDao.getMessagesByCid( cid, messagePagination: const PaginationParams( - limit: 10, lessThan: 'p-msg-25', ), ); diff --git a/packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart index 5dd37dbf8a..11f3242869 100644 --- a/packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart @@ -447,7 +447,6 @@ void main() { await pinnedMessageDao.updateMessages(cid, [ Message( id: messageId, - type: 'regular', user: dbUser, text: 'Hello', createdAt: DateTime.now(), @@ -495,7 +494,6 @@ void main() { 5, (i) => Message( id: 'pmsg-iso-$i', - type: 'regular', user: dbUser, text: 'Hello $i', createdAt: baseTime.add(Duration(seconds: i)), @@ -561,7 +559,6 @@ void main() { await pinnedMessageDao.updateMessages(cid, [ Message( id: messageId, - type: 'regular', user: dbUser, text: 'Vote please', createdAt: DateTime.now(), @@ -616,7 +613,6 @@ void main() { const parentId = 'pmsg-with-draft'; final parentMessage = Message( id: parentId, - type: 'regular', user: dbUser, text: 'msg', createdAt: DateTime.now(), @@ -677,7 +673,6 @@ void main() { await pinnedMessageDao.updateMessages(cid, [ Message( id: quotedMessageId, - type: 'regular', user: dbUser, text: 'first', createdAt: baseTime, @@ -685,7 +680,6 @@ void main() { ), Message( id: quotingMessageId, - type: 'regular', user: dbUser, text: 'second', createdAt: baseTime.add(const Duration(seconds: 1)), @@ -721,14 +715,12 @@ void main() { await pinnedMessageDao.updateMessages(cid, [ Message( id: 'pC', - type: 'regular', user: dbUser, text: 'root', createdAt: baseTime, ), Message( id: 'pB', - type: 'regular', user: dbUser, text: 'mid', createdAt: baseTime.add(const Duration(seconds: 1)), @@ -736,7 +728,6 @@ void main() { ), Message( id: 'pA', - type: 'regular', user: dbUser, text: 'top', createdAt: baseTime.add(const Duration(seconds: 2)), diff --git a/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart index debef2a237..7ec4a772c6 100644 --- a/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart @@ -164,7 +164,6 @@ void main() { total, (i) => Message( id: 'cmsg-$i', - type: 'regular', user: dbUser, text: 'Hello $i', createdAt: baseTime.add(Duration(seconds: i)), @@ -232,7 +231,7 @@ void main() { }); test( - 'returns the given user\'s reactions per message id; message ids ' + "returns the given user's reactions per message id; message ids " 'with no reactions from that user map to an empty list', () async { const cid = 'test:Cid'; const targetUser = 'targetUser'; From 2e719f4c1eae81b2e66be248df64f480c501f522 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Mon, 25 May 2026 23:36:49 +0200 Subject: [PATCH 15/17] refactor(dao): Add message.id tiebreakier --- .../lib/src/dao/message_dao.dart | 84 +++++++++++-------- .../test/src/dao/message_dao_test.dart | 76 +++++++++++++++++ 2 files changed, 124 insertions(+), 36 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index 6d53bdf171..e640173ae9 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -170,27 +170,15 @@ class MessageDao extends DatabaseAccessor PaginationParams? messagePagination, }) async { final ( - lessThanCutoff, - lessThanOrEqualCutoff, - greaterThanCutoff, - greaterThanOrEqualCutoff, + lessThanCursor, + lessThanOrEqualCursor, + greaterThanCursor, + greaterThanOrEqualCursor, ) = await ( - switch (messagePagination?.lessThan) { - final id? => _lookupMessageCreatedAt(id), - _ => Future.value(), - }, - switch (messagePagination?.lessThanOrEqual) { - final id? => _lookupMessageCreatedAt(id), - _ => Future.value(), - }, - switch (messagePagination?.greaterThan) { - final id? => _lookupMessageCreatedAt(id), - _ => Future.value(), - }, - switch (messagePagination?.greaterThanOrEqual) { - final id? => _lookupMessageCreatedAt(id), - _ => Future.value(), - }, + _lookupCursor(messagePagination?.lessThan), + _lookupCursor(messagePagination?.lessThanOrEqual), + _lookupCursor(messagePagination?.greaterThan), + _lookupCursor(messagePagination?.greaterThanOrEqual), ).wait; // When the caller is paginating forward (greaterThan / greaterThanOrEqual @@ -199,9 +187,9 @@ class MessageDao extends DatabaseAccessor // recent (closest to a `lessThan` cursor, or the channel tail when no // cursor is set). The final result is always reshaped to ASC for display. final isForwardPagination = - (greaterThanCutoff != null || greaterThanOrEqualCutoff != null) && - lessThanCutoff == null && - lessThanOrEqualCutoff == null; + (greaterThanCursor != null || greaterThanOrEqualCursor != null) && + lessThanCursor == null && + lessThanOrEqualCursor == null; final orderBy = isForwardPagination ? [ @@ -224,17 +212,37 @@ class MessageDao extends DatabaseAccessor ..where(messages.parentId.isNull() | messages.showInChannel.equals(true)) ..orderBy(orderBy); - if (lessThanCutoff case final t?) { - query.where(messages.createdAt.isSmallerThanValue(t)); + // Cursor predicates compare the full `(createdAt, id)` tuple — the same + // key used in ORDER BY — so messages sharing a `createdAt` with the cursor + // fall on the correct side of the boundary. Filtering on `createdAt` alone + // would skip or repeat those siblings across pages. + if (lessThanCursor case final c?) { + query.where( + messages.createdAt.isSmallerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isSmallerThanValue(c.id)), + ); } - if (lessThanOrEqualCutoff case final t?) { - query.where(messages.createdAt.isSmallerOrEqualValue(t)); + if (lessThanOrEqualCursor case final c?) { + query.where( + messages.createdAt.isSmallerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isSmallerOrEqualValue(c.id)), + ); } - if (greaterThanCutoff case final t?) { - query.where(messages.createdAt.isBiggerThanValue(t)); + if (greaterThanCursor case final c?) { + query.where( + messages.createdAt.isBiggerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isBiggerThanValue(c.id)), + ); } - if (greaterThanOrEqualCutoff case final t?) { - query.where(messages.createdAt.isBiggerOrEqualValue(t)); + if (greaterThanOrEqualCursor case final c?) { + query.where( + messages.createdAt.isBiggerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isBiggerOrEqualValue(c.id)), + ); } if (messagePagination != null) { @@ -272,11 +280,13 @@ class MessageDao extends DatabaseAccessor ); } - /// Returns the `createdAt` of the message with [id] in the local cache, - /// or `null` if the message isn't cached or isn't visible in the channel - /// (i.e. a thread reply with `showInChannel = false`). - Future _lookupMessageCreatedAt(String id) { - return (selectOnly(messages) + /// Returns the `(createdAt, id)` cursor for the message with [id] in the + /// local cache, or `null` if [id] is null, the message isn't cached, or + /// isn't visible in the channel (i.e. a thread reply with + /// `showInChannel = false`). + Future<({DateTime createdAt, String id})?> _lookupCursor(String? id) async { + if (id == null) return null; + final createdAt = await (selectOnly(messages) ..addColumns([messages.createdAt]) ..where(messages.id.equals(id)) ..where( @@ -284,5 +294,7 @@ class MessageDao extends DatabaseAccessor )) .map((row) => row.read(messages.createdAt)) .getSingleOrNull(); + if (createdAt == null) return null; + return (createdAt: createdAt, id: id); } } diff --git a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart index 7b8d69a05c..93a9d06dd8 100644 --- a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart @@ -610,6 +610,82 @@ void main() { expect(fetchedMessages.first.id, 'testMessageId${cid}5'); expect(fetchedMessages.last.id, 'testMessageId${cid}14'); }); + + test('cursor with tied createdAt does not skip or duplicate siblings', + () async { + // Three messages share an identical `createdAt`. The SQL ORDER BY uses + // the `(createdAt, id)` tuple, so within the trio the relative order is + // by id (lexicographic). A cursor at `msg_tieB` must split the trio + // cleanly: `msg_tieA` lands on the "before" side, `msg_tieC` on the + // "after" side. A `createdAt`-only WHERE predicate would collapse all + // three into the cursor's bucket and drop or keep them together. + final users = [User(id: 'tieUser')]; + await database.userDao.updateUsers(users); + await database.channelDao.updateChannels([ChannelModel(cid: cid)]); + + final tie = DateTime.now(); + final earlier = tie.subtract(const Duration(seconds: 1)); + final later = tie.add(const Duration(seconds: 1)); + + Message m(String id, DateTime t) => Message( + id: id, + type: 'regular', + user: users.first, + createdAt: t, + updatedAt: t, + text: id, + ); + + await messageDao.updateMessages(cid, [ + m('msg_pre', earlier), + m('msg_tieA', tie), + m('msg_tieB', tie), + m('msg_tieC', tie), + m('msg_post', later), + ]); + + final before = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'msg_tieB', + ), + ); + expect(before.map((m) => m.id).toList(), ['msg_pre', 'msg_tieA']); + + final after = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThan: 'msg_tieB', + ), + ); + expect(after.map((m) => m.id).toList(), ['msg_tieC', 'msg_post']); + + final atOrBefore = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThanOrEqual: 'msg_tieB', + ), + ); + expect( + atOrBefore.map((m) => m.id).toList(), + ['msg_pre', 'msg_tieA', 'msg_tieB'], + ); + + final atOrAfter = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThanOrEqual: 'msg_tieB', + ), + ); + expect( + atOrAfter.map((m) => m.id).toList(), + ['msg_tieB', 'msg_tieC', 'msg_post'], + ); + }); }); test('updateMessages', () async { From 35569a0739ab7e817b7e7ff28e5c4a35b0c66549 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Mon, 25 May 2026 23:38:33 +0200 Subject: [PATCH 16/17] refactor(dao): Add message.id tiebreaker --- .../stream_chat_persistence/test/src/dao/message_dao_test.dart | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart index 93a9d06dd8..7fe81432b4 100644 --- a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart @@ -629,7 +629,6 @@ void main() { Message m(String id, DateTime t) => Message( id: id, - type: 'regular', user: users.first, createdAt: t, updatedAt: t, From fa6898e81c420d187f0589108ff87f0c77a2d09d Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Mon, 25 May 2026 23:42:51 +0200 Subject: [PATCH 17/17] refactor(dao): Update CHANGELOG.md --- packages/stream_chat_persistence/CHANGELOG.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/stream_chat_persistence/CHANGELOG.md b/packages/stream_chat_persistence/CHANGELOG.md index a4388ca94d..8ff1b9bb00 100644 --- a/packages/stream_chat_persistence/CHANGELOG.md +++ b/packages/stream_chat_persistence/CHANGELOG.md @@ -7,12 +7,9 @@ - Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. - Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. -✅ Added - -- `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`. - 🐞 Fixed +- `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`. - `MessageDao.getMessagesByCid` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour. - `MessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail — mirroring how `lessThan` already returned the messages immediately before the pivot.