diff --git a/packages/stream_chat_persistence/CHANGELOG.md b/packages/stream_chat_persistence/CHANGELOG.md index 31bbe53fe..468ae6807 100644 --- a/packages/stream_chat_persistence/CHANGELOG.md +++ b/packages/stream_chat_persistence/CHANGELOG.md @@ -1,8 +1,19 @@ -# Upcoming +## Upcoming Changes 🚀 Performance - Reduce the number of DB reads in the `ChatPersistenceClient.getChannelStates` method. +- Read only the messages matching the `PaginationParams` from DB when calling `MessageDao.getMessagesByCid` instead of reading all messages for the channel and applying pagination in memory. +- Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. +- Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. +- Improve the message read times from DB. + +🐞 Fixed + +- `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`. +- `MessageDao.getMessagesByCid` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour. +- `MessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail — mirroring how `lessThan` already returned the messages immediately before the pivot. + ## 9.24.0 diff --git a/packages/stream_chat_persistence/lib/src/dao/draft_message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/draft_message_dao.dart index b5a5cb6fc..00e6f7f79 100644 --- a/packages/stream_chat_persistence/lib/src/dao/draft_message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/draft_message_dao.dart @@ -3,6 +3,7 @@ import 'package:drift/drift.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; +import 'package:stream_chat_persistence/src/db/query_utils.dart'; import 'package:stream_chat_persistence/src/entity/entity.dart'; import 'package:stream_chat_persistence/src/mapper/mapper.dart'; @@ -66,6 +67,27 @@ class DraftMessageDao extends DatabaseAccessor return _draftFromEntity(result); } + /// Returns thread drafts in [cid] for every parent message id in + /// [parentIds], keyed by parent message id. + Future> getDraftMessagesByParentIds( + String cid, + List parentIds, + ) async { + if (parentIds.isEmpty) return const {}; + final result = {for (final id in parentIds) id: null}; + for (final chunk in chunked(parentIds)) { + final query = select(draftMessages) + ..where((tbl) => tbl.channelCid.equals(cid) & tbl.parentId.isIn(chunk)); + final entities = await query.get(); + for (final entity in entities) { + if (entity.parentId case final pid?) { + result[pid] = await _draftFromEntity(entity); + } + } + } + return result; + } + /// Updates the draft message data of a particular channel with /// the new [messageList] data. Future updateDraftMessages(List draftMessageList) { diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index 77d84dff0..15a34402b 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -1,5 +1,3 @@ -import 'dart:math'; - import 'package:drift/drift.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; @@ -36,42 +34,119 @@ class MessageDao extends DatabaseAccessor Future deleteMessageByCids(List cids) async => (delete(messages)..where((tbl) => tbl.channelCid.isIn(cids))).go(); - Future _messageFromJoinRow( - TypedResult rows, { + /// Hydrates `rows` into `Message`s using batched lookups for related + /// entities. Reactions, polls, quoted messages and (optionally) drafts are + /// each fetched once via a single `WHERE ... IN (?). + Future> _messagesFromJoinRows( + List rows, { bool fetchDraft = false, }) async { - final userEntity = rows.readTableOrNull(_users); - final pinnedByEntity = rows.readTableOrNull(_pinnedByUsers); - final msgEntity = rows.readTable(messages); - final latestReactions = await _db.reactionDao.getReactions(msgEntity.id); - final ownReactions = await _db.reactionDao.getReactionsByUserId( - msgEntity.id, - _db.userId, - ); + if (rows.isEmpty) return const []; - final quotedMessage = await switch (msgEntity.quotedMessageId) { - final id? => getMessageById(id), - _ => null, - }; + final messageIds = []; + final quotedIds = []; + final pollIds = []; + // note: While possible, in real case scenarios this will NOT hold more than + // a single value. + final cids = {}; + for (final row in rows) { + final msg = row.readTable(messages); + messageIds.add(msg.id); + if (msg.quotedMessageId case final id?) quotedIds.add(id); + if (msg.pollId case final id?) pollIds.add(id); + cids.add(msg.channelCid); + } - final poll = await switch (msgEntity.pollId) { - final id? => _db.pollDao.getPollById(id), - _ => null, - }; + final results = await Future.wait([ + // Reactions + _db.reactionDao.getReactionsForMessages(messageIds), + // Own reactions + _db.reactionDao.getReactionsForMessagesByUserId(messageIds, _db.userId), + // Polls + if (pollIds.isNotEmpty) + _db.pollDao.getPollsByIds(pollIds) + else + Future.value(const {}), + // Drafts + if (fetchDraft) + Future.wait([ + for (final cid in cids) + _db.draftMessageDao + .getDraftMessagesByParentIds(cid, messageIds) + .then((map) => MapEntry(cid, map)), + ]).then(Map.fromEntries) + else + Future.value(const >{}), + ]); - final draft = await switch (fetchDraft) { - true => _db.draftMessageDao.getDraftMessageByCid( - msgEntity.channelCid, - parentId: msgEntity.id, + final latestReactionsByMsg = results[0] as Map>; + final ownReactionsByMsg = results[1] as Map>; + final pollsById = results[2] as Map; + final draftsByCidByParentId = + results[3] as Map>; + + final quotedById = {}; + if (quotedIds.isNotEmpty) { + final quoteRows = await (select(messages).join([ + leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), + leftOuterJoin( + _pinnedByUsers, + messages.pinnedByUserId.equalsExp(_pinnedByUsers.id), ), + ]) + ..where(messages.id.isIn(quotedIds))) + .get(); + final quotedMessages = await _messagesFromJoinRows( + quoteRows, + fetchDraft: true, + ); + for (final m in quotedMessages) { + quotedById[m.id] = m; + } + } + + return [ + for (final row in rows) + _buildMessage( + row, + latestReactionsByMsg: latestReactionsByMsg, + ownReactionsByMsg: ownReactionsByMsg, + pollsById: pollsById, + quotedById: quotedById, + draftsByCidByParentId: draftsByCidByParentId, + ), + ]; + } + + /// Builds a single [Message] from a join row + the pre-fetched maps + /// assembled by [_messagesFromJoinRows]. + Message _buildMessage( + TypedResult row, { + required Map> latestReactionsByMsg, + required Map> ownReactionsByMsg, + required Map pollsById, + required Map quotedById, + required Map> draftsByCidByParentId, + }) { + final userEntity = row.readTableOrNull(_users); + final pinnedByEntity = row.readTableOrNull(_pinnedByUsers); + final msgEntity = row.readTable(messages); + + final quotedMessage = switch (msgEntity.quotedMessageId) { + final id? => quotedById[id], _ => null, }; + final poll = switch (msgEntity.pollId) { + final id? => pollsById[id], + _ => null, + }; + final draft = draftsByCidByParentId[msgEntity.channelCid]?[msgEntity.id]; return msgEntity.toMessage( user: userEntity?.toUser(), pinnedBy: pinnedByEntity?.toUser(), - latestReactions: latestReactions, - ownReactions: ownReactions, + latestReactions: latestReactionsByMsg[msgEntity.id] ?? const [], + ownReactions: ownReactionsByMsg[msgEntity.id] ?? const [], quotedMessage: quotedMessage, poll: poll, draft: draft, @@ -98,27 +173,27 @@ class MessageDao extends DatabaseAccessor final result = await query.getSingleOrNull(); if (result == null) return null; - return _messageFromJoinRow( - result, - fetchDraft: fetchDraft, - ); + final hydrated = + await _messagesFromJoinRows([result], fetchDraft: fetchDraft); + return hydrated.firstOrNull; } /// Returns all the messages of a particular thread by matching /// [Messages.channelCid] with [cid] - Future> getThreadMessages(String cid) async => - Future.wait(await (select(messages).join([ - leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), - leftOuterJoin( - _pinnedByUsers, - messages.pinnedByUserId.equalsExp(_pinnedByUsers.id), - ), - ]) - ..where(messages.channelCid.equals(cid)) - ..where(messages.parentId.isNotNull()) - ..orderBy([OrderingTerm.asc(messages.createdAt)])) - .map(_messageFromJoinRow) - .get()); + Future> getThreadMessages(String cid) async { + final rows = await (select(messages).join([ + leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), + leftOuterJoin( + _pinnedByUsers, + messages.pinnedByUserId.equalsExp(_pinnedByUsers.id), + ), + ]) + ..where(messages.channelCid.equals(cid)) + ..where(messages.parentId.isNotNull()) + ..orderBy([OrderingTerm.asc(messages.createdAt)])) + .get(); + return _messagesFromJoinRows(rows); + } /// Returns all the messages of a particular thread by matching /// [Messages.parentId] with [parentId] @@ -126,7 +201,7 @@ class MessageDao extends DatabaseAccessor String parentId, { PaginationParams? options, }) async { - final msgList = await Future.wait(await (select(messages).join([ + final rows = await (select(messages).join([ leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), leftOuterJoin( _pinnedByUsers, @@ -136,30 +211,32 @@ class MessageDao extends DatabaseAccessor ..where(messages.parentId.isNotNull()) ..where(messages.parentId.equals(parentId)) ..orderBy([OrderingTerm.asc(messages.createdAt)])) - .map(_messageFromJoinRow) - .get()); + .get(); + final msgList = await _messagesFromJoinRows(rows); if (msgList.isNotEmpty) { + final mutable = msgList.toList(); if (options?.lessThan != null) { - final lessThanIndex = msgList.indexWhere( + final lessThanIndex = mutable.indexWhere( (m) => m.id == options!.lessThan, ); if (lessThanIndex != -1) { - msgList.removeRange(lessThanIndex, msgList.length); + mutable.removeRange(lessThanIndex, mutable.length); } } if (options?.greaterThan != null) { - final greaterThanIndex = msgList.indexWhere( + final greaterThanIndex = mutable.indexWhere( (m) => m.id == options!.greaterThan, ); if (greaterThanIndex != -1) { - msgList.removeRange(0, greaterThanIndex); + mutable.removeRange(0, greaterThanIndex); } } final limit = options?.limit; if (limit != null && limit > 0) { - return msgList.take(limit).toList(); + return mutable.take(limit).toList(); } + return mutable; } return msgList; } @@ -171,6 +248,38 @@ class MessageDao extends DatabaseAccessor bool fetchDraft = true, PaginationParams? messagePagination, }) async { + final ( + lessThanCursor, + lessThanOrEqualCursor, + greaterThanCursor, + greaterThanOrEqualCursor, + ) = await ( + _lookupCursor(messagePagination?.lessThan), + _lookupCursor(messagePagination?.lessThanOrEqual), + _lookupCursor(messagePagination?.greaterThan), + _lookupCursor(messagePagination?.greaterThanOrEqual), + ).wait; + + // When the caller is paginating forward (greaterThan / greaterThanOrEqual + // only), order ASC so the SQL `LIMIT` retains the N messages immediately + // AFTER the cursor. Otherwise order DESC so `LIMIT` retains the N most + // recent (closest to a `lessThan` cursor, or the channel tail when no + // cursor is set). The final result is always reshaped to ASC for display. + final isForwardPagination = + (greaterThanCursor != null || greaterThanOrEqualCursor != null) && + lessThanCursor == null && + lessThanOrEqualCursor == null; + + final orderBy = isForwardPagination + ? [ + OrderingTerm.asc(messages.createdAt), + OrderingTerm.asc(messages.id), + ] + : [ + OrderingTerm.desc(messages.createdAt), + OrderingTerm.desc(messages.id), + ]; + final query = select(messages).join([ leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), leftOuterJoin( @@ -180,44 +289,48 @@ class MessageDao extends DatabaseAccessor ]) ..where(messages.channelCid.equals(cid)) ..where(messages.parentId.isNull() | messages.showInChannel.equals(true)) - ..orderBy([OrderingTerm.asc(messages.createdAt)]); - - final result = await query.get(); - if (result.isEmpty) return []; + ..orderBy(orderBy); - final msgList = await Future.wait( - result.map( - (row) => _messageFromJoinRow( - row, - fetchDraft: fetchDraft, - ), - ), - ); + // Cursor predicates compare the full `(createdAt, id)` tuple — the same + // key used in ORDER BY — so messages sharing a `createdAt` with the cursor + // fall on the correct side of the boundary. Filtering on `createdAt` alone + // would skip or repeat those siblings across pages. + if (lessThanCursor case final c?) { + query.where( + messages.createdAt.isSmallerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isSmallerThanValue(c.id)), + ); + } + if (lessThanOrEqualCursor case final c?) { + query.where( + messages.createdAt.isSmallerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isSmallerOrEqualValue(c.id)), + ); + } + if (greaterThanCursor case final c?) { + query.where( + messages.createdAt.isBiggerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isBiggerThanValue(c.id)), + ); + } + if (greaterThanOrEqualCursor case final c?) { + query.where( + messages.createdAt.isBiggerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isBiggerOrEqualValue(c.id)), + ); + } - if (msgList.isNotEmpty) { - if (messagePagination?.lessThan != null) { - final lessThanIndex = msgList.indexWhere( - (m) => m.id == messagePagination!.lessThan, - ); - if (lessThanIndex != -1) { - msgList.removeRange(lessThanIndex, msgList.length); - } - } - if (messagePagination?.greaterThan != null) { - final greaterThanIndex = msgList.indexWhere( - (m) => m.id == messagePagination!.greaterThan, - ); - if (greaterThanIndex != -1) { - msgList.removeRange(0, greaterThanIndex); - } - } - if (messagePagination?.limit != null) { - return msgList - .skip(max(0, msgList.length - messagePagination!.limit)) - .toList(); - } + if (messagePagination != null) { + query.limit(messagePagination.limit); } - return msgList; + + final rows = await query.get(); + final orderedRows = isForwardPagination ? rows : rows.reversed.toList(); + return _messagesFromJoinRows(orderedRows, fetchDraft: fetchDraft); } /// Updates the message data of a particular channel with @@ -241,4 +354,22 @@ class MessageDao extends DatabaseAccessor (batch) => batch.insertAllOnConflictUpdate(messages, entities), ); } + + /// Returns the `(createdAt, id)` cursor for the message with [id] in the + /// local cache, or `null` if [id] is null, the message isn't cached, or + /// isn't visible in the channel (i.e. a thread reply with + /// `showInChannel = false`). + Future<({DateTime createdAt, String id})?> _lookupCursor(String? id) async { + if (id == null) return null; + final createdAt = await (selectOnly(messages) + ..addColumns([messages.createdAt]) + ..where(messages.id.equals(id)) + ..where( + messages.parentId.isNull() | messages.showInChannel.equals(true), + )) + .map((row) => row.read(messages.createdAt)) + .getSingleOrNull(); + if (createdAt == null) return null; + return (createdAt: createdAt, id: id); + } } diff --git a/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart index 7992accf5..0c7ff2652 100644 --- a/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart @@ -3,7 +3,6 @@ import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; import 'package:stream_chat_persistence/src/entity/pinned_messages.dart'; import 'package:stream_chat_persistence/src/entity/users.dart'; - import 'package:stream_chat_persistence/src/mapper/mapper.dart'; part 'pinned_message_dao.g.dart'; @@ -35,44 +34,117 @@ class PinnedMessageDao extends DatabaseAccessor Future deleteMessageByCids(List cids) async => (delete(pinnedMessages)..where((tbl) => tbl.channelCid.isIn(cids))).go(); - Future _messageFromJoinRow( - TypedResult rows, { + /// Hydrates `rows` (from the pinned-messages table) into `Message`s using + /// batched lookups for related entities. + Future> _messagesFromJoinRows( + List rows, { bool fetchDraft = false, }) async { - final userEntity = rows.readTableOrNull(_users); - final pinnedByEntity = rows.readTableOrNull(_pinnedByUsers); - final msgEntity = rows.readTable(pinnedMessages); - final latestReactions = - await _db.pinnedMessageReactionDao.getReactions(msgEntity.id); - final ownReactions = - await _db.pinnedMessageReactionDao.getReactionsByUserId( - msgEntity.id, - _db.userId, - ); + if (rows.isEmpty) return const []; - final quotedMessage = await switch (msgEntity.quotedMessageId) { - final id? => getMessageById(id), - _ => null, - }; + final messageIds = []; + final quotedIds = []; + final pollIds = []; + // note: While possible, in real case scenarios this will NOT hold more than + // a single value. + final cids = {}; + for (final row in rows) { + final msg = row.readTable(pinnedMessages); + messageIds.add(msg.id); + if (msg.quotedMessageId case final id?) quotedIds.add(id); + if (msg.pollId case final id?) pollIds.add(id); + cids.add(msg.channelCid); + } - final poll = await switch (msgEntity.pollId) { - final id? => _db.pollDao.getPollById(id), - _ => null, - }; + final results = await Future.wait([ + // Reactions + _db.pinnedMessageReactionDao.getReactionsForMessages(messageIds), + // Own reactions + _db.pinnedMessageReactionDao + .getReactionsForMessagesByUserId(messageIds, _db.userId), + // Polls + if (pollIds.isNotEmpty) + _db.pollDao.getPollsByIds(pollIds) + else + Future.value(const {}), + // Drafts + if (fetchDraft) + Future.wait([ + for (final cid in cids) + _db.draftMessageDao + .getDraftMessagesByParentIds(cid, messageIds) + .then((map) => MapEntry(cid, map)), + ]).then(Map.fromEntries) + else + Future.value(const >{}), + ]); + + final latestReactionsByMsg = results[0] as Map>; + final ownReactionsByMsg = results[1] as Map>; + final pollsById = results[2] as Map; + final draftsByCidByParentId = + results[3] as Map>; + + final quotedById = {}; + if (quotedIds.isNotEmpty) { + final quoteRows = await (select(pinnedMessages).join([ + leftOuterJoin(_users, pinnedMessages.userId.equalsExp(_users.id)), + leftOuterJoin( + _pinnedByUsers, + pinnedMessages.pinnedByUserId.equalsExp(_pinnedByUsers.id), + ), + ]) + ..where(pinnedMessages.id.isIn(quotedIds))) + .get(); + final quotedMessages = await _messagesFromJoinRows( + quoteRows, + fetchDraft: true, + ); + for (final m in quotedMessages) { + quotedById[m.id] = m; + } + } - final draft = await switch (fetchDraft) { - true => _db.draftMessageDao.getDraftMessageByCid( - msgEntity.channelCid, - parentId: msgEntity.id, + return [ + for (final row in rows) + _buildMessage( + row, + latestReactionsByMsg: latestReactionsByMsg, + ownReactionsByMsg: ownReactionsByMsg, + pollsById: pollsById, + quotedById: quotedById, + draftsByCidByParentId: draftsByCidByParentId, ), + ]; + } + + Message _buildMessage( + TypedResult row, { + required Map> latestReactionsByMsg, + required Map> ownReactionsByMsg, + required Map pollsById, + required Map quotedById, + required Map> draftsByCidByParentId, + }) { + final userEntity = row.readTableOrNull(_users); + final pinnedByEntity = row.readTableOrNull(_pinnedByUsers); + final msgEntity = row.readTable(pinnedMessages); + + final quotedMessage = switch (msgEntity.quotedMessageId) { + final id? => quotedById[id], + _ => null, + }; + final poll = switch (msgEntity.pollId) { + final id? => pollsById[id], _ => null, }; + final draft = draftsByCidByParentId[msgEntity.channelCid]?[msgEntity.id]; return msgEntity.toMessage( user: userEntity?.toUser(), pinnedBy: pinnedByEntity?.toUser(), - latestReactions: latestReactions, - ownReactions: ownReactions, + latestReactions: latestReactionsByMsg[msgEntity.id] ?? const [], + ownReactions: ownReactionsByMsg[msgEntity.id] ?? const [], quotedMessage: quotedMessage, poll: poll, draft: draft, @@ -96,27 +168,27 @@ class PinnedMessageDao extends DatabaseAccessor final result = await query.getSingleOrNull(); if (result == null) return null; - return _messageFromJoinRow( - result, - fetchDraft: fetchDraft, - ); + final hydrated = + await _messagesFromJoinRows([result], fetchDraft: fetchDraft); + return hydrated.firstOrNull; } /// Returns all the messages of a particular thread by matching /// [PinnedMessages.channelCid] with [cid] - Future> getThreadMessages(String cid) async => - Future.wait(await (select(pinnedMessages).join([ - leftOuterJoin(_users, pinnedMessages.userId.equalsExp(_users.id)), - leftOuterJoin( - _pinnedByUsers, - pinnedMessages.pinnedByUserId.equalsExp(_pinnedByUsers.id), - ), - ]) - ..where(pinnedMessages.channelCid.equals(cid)) - ..where(pinnedMessages.parentId.isNotNull()) - ..orderBy([OrderingTerm.asc(pinnedMessages.createdAt)])) - .map(_messageFromJoinRow) - .get()); + Future> getThreadMessages(String cid) async { + final rows = await (select(pinnedMessages).join([ + leftOuterJoin(_users, pinnedMessages.userId.equalsExp(_users.id)), + leftOuterJoin( + _pinnedByUsers, + pinnedMessages.pinnedByUserId.equalsExp(_pinnedByUsers.id), + ), + ]) + ..where(pinnedMessages.channelCid.equals(cid)) + ..where(pinnedMessages.parentId.isNotNull()) + ..orderBy([OrderingTerm.asc(pinnedMessages.createdAt)])) + .get(); + return _messagesFromJoinRows(rows); + } /// Returns all the messages of a particular thread by matching /// [PinnedMessages.parentId] with [parentId] @@ -124,7 +196,7 @@ class PinnedMessageDao extends DatabaseAccessor String parentId, { PaginationParams? options, }) async { - final msgList = await Future.wait(await (select(pinnedMessages).join([ + final rows = await (select(pinnedMessages).join([ leftOuterJoin(_users, pinnedMessages.userId.equalsExp(_users.id)), leftOuterJoin( _pinnedByUsers, @@ -134,29 +206,31 @@ class PinnedMessageDao extends DatabaseAccessor ..where(pinnedMessages.parentId.isNotNull()) ..where(pinnedMessages.parentId.equals(parentId)) ..orderBy([OrderingTerm.asc(pinnedMessages.createdAt)])) - .map(_messageFromJoinRow) - .get()); + .get(); + final msgList = await _messagesFromJoinRows(rows); if (msgList.isNotEmpty) { + final mutable = msgList.toList(); if (options?.lessThan != null) { - final lessThanIndex = msgList.indexWhere( + final lessThanIndex = mutable.indexWhere( (m) => m.id == options!.lessThan, ); if (lessThanIndex != -1) { - msgList.removeRange(lessThanIndex, msgList.length); + mutable.removeRange(lessThanIndex, mutable.length); } } if (options?.greaterThan != null) { - final greaterThanIndex = msgList.indexWhere( + final greaterThanIndex = mutable.indexWhere( (m) => m.id == options!.greaterThan, ); if (greaterThanIndex != -1) { - msgList.removeRange(0, greaterThanIndex); + mutable.removeRange(0, greaterThanIndex); } } if (options?.limit != null) { - return msgList.take(options!.limit).toList(); + return mutable.take(options!.limit).toList(); } + return mutable; } return msgList; } @@ -180,38 +254,31 @@ class PinnedMessageDao extends DatabaseAccessor pinnedMessages.showInChannel.equals(true)) ..orderBy([OrderingTerm.asc(pinnedMessages.createdAt)]); - final result = await query.get(); - if (result.isEmpty) return []; - - final msgList = await Future.wait( - result.map( - (row) => _messageFromJoinRow( - row, - fetchDraft: fetchDraft, - ), - ), - ); + final rows = await query.get(); + final msgList = await _messagesFromJoinRows(rows, fetchDraft: fetchDraft); if (msgList.isNotEmpty) { + final mutable = msgList.toList(); if (messagePagination?.lessThan != null) { - final lessThanIndex = msgList.indexWhere( + final lessThanIndex = mutable.indexWhere( (m) => m.id == messagePagination!.lessThan, ); if (lessThanIndex != -1) { - msgList.removeRange(lessThanIndex, msgList.length); + mutable.removeRange(lessThanIndex, mutable.length); } } if (messagePagination?.greaterThan != null) { - final greaterThanIndex = msgList.indexWhere( + final greaterThanIndex = mutable.indexWhere( (m) => m.id == messagePagination!.greaterThan, ); if (greaterThanIndex != -1) { - msgList.removeRange(0, greaterThanIndex); + mutable.removeRange(0, greaterThanIndex); } } if (messagePagination?.limit != null) { - return msgList.take(messagePagination!.limit).toList(); + return mutable.take(messagePagination!.limit).toList(); } + return mutable; } return msgList; } diff --git a/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart b/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart index c5c5a6d45..27f5f4439 100644 --- a/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart @@ -1,6 +1,7 @@ import 'package:drift/drift.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; +import 'package:stream_chat_persistence/src/db/query_utils.dart'; import 'package:stream_chat_persistence/src/entity/pinned_message_reactions.dart'; import 'package:stream_chat_persistence/src/entity/users.dart'; import 'package:stream_chat_persistence/src/mapper/mapper.dart'; @@ -15,29 +16,70 @@ class PinnedMessageReactionDao extends DatabaseAccessor PinnedMessageReactionDao(super.db); /// Returns all the reactions of a particular message by matching - /// [Reactions.messageId] with [messageId] - Future> getReactions(String messageId) => - (select(pinnedMessageReactions).join([ - leftOuterJoin(users, pinnedMessageReactions.userId.equalsExp(users.id)), - ]) - ..where(pinnedMessageReactions.messageId.equals(messageId)) - ..orderBy([OrderingTerm.asc(pinnedMessageReactions.createdAt)])) - .map((rows) { - final userEntity = rows.readTableOrNull(users); - final reactionEntity = rows.readTable(pinnedMessageReactions); - return reactionEntity.toReaction(user: userEntity?.toUser()); - }).get(); + /// [Reactions.messageId] with [messageId]. + /// + /// Not used in production — `PinnedMessageDao` hydrates via the batched + /// [getReactionsForMessages]. Kept for convenience (tests + ad-hoc + /// single-id lookups). + Future> getReactions(String messageId) { + final where = pinnedMessageReactions.messageId.equals(messageId); + return _selectReactions(where); + } /// Returns all the reactions of a particular message /// added by a particular user by matching /// [Reactions.messageId] with [messageId] and - /// [Reactions.userId] with [userId] + /// [Reactions.userId] with [userId]. + /// + /// Not used in production — `PinnedMessageDao` hydrates via the batched + /// [getReactionsForMessagesByUserId]. Kept for convenience. Future> getReactionsByUserId( String messageId, String userId, + ) { + final where = pinnedMessageReactions.messageId.equals(messageId) & + pinnedMessageReactions.userId.equals(userId); + return _selectReactions(where); + } + + /// Returns pinned-message reactions for every id in [messageIds], grouped + /// by message id. + Future>> getReactionsForMessages( + List messageIds, ) async { - final reactions = await getReactions(messageId); - return reactions.where((it) => it.userId == userId).toList(); + if (messageIds.isEmpty) return const {}; + final grouped = >{ + for (final id in messageIds) id: [], + }; + for (final chunk in chunked(messageIds)) { + final where = pinnedMessageReactions.messageId.isIn(chunk); + final rows = await _selectReactions(where); + for (final r in rows) { + grouped[r.messageId]!.add(r); + } + } + return grouped; + } + + /// Returns pinned-message reactions for every id in [messageIds] that were + /// added by [userId], grouped by message id. + Future>> getReactionsForMessagesByUserId( + List messageIds, + String userId, + ) async { + if (messageIds.isEmpty) return const {}; + final grouped = >{ + for (final id in messageIds) id: [], + }; + for (final chunk in chunked(messageIds)) { + final where = pinnedMessageReactions.messageId.isIn(chunk) & + pinnedMessageReactions.userId.equals(userId); + final rows = await _selectReactions(where); + for (final r in rows) { + grouped[r.messageId]!.add(r); + } + } + return grouped; } /// Updates the reactions data with the new [reactionList] data @@ -57,4 +99,17 @@ class PinnedMessageReactionDao extends DatabaseAccessor (r) => r.messageId.isIn(messageIds), ); }); + + Future> _selectReactions(Expression where) { + final rows = select(pinnedMessageReactions).join([ + leftOuterJoin(users, pinnedMessageReactions.userId.equalsExp(users.id)), + ]) + ..where(where) + ..orderBy([OrderingTerm.asc(pinnedMessageReactions.createdAt)]); + return rows.map((row) { + final reactionEntity = row.readTable(pinnedMessageReactions); + final userEntity = row.readTableOrNull(users); + return reactionEntity.toReaction(user: userEntity?.toUser()); + }).get(); + } } diff --git a/packages/stream_chat_persistence/lib/src/dao/poll_dao.dart b/packages/stream_chat_persistence/lib/src/dao/poll_dao.dart index 10a10a024..8d974e15c 100644 --- a/packages/stream_chat_persistence/lib/src/dao/poll_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/poll_dao.dart @@ -1,6 +1,7 @@ import 'package:drift/drift.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; +import 'package:stream_chat_persistence/src/db/query_utils.dart'; import 'package:stream_chat_persistence/src/entity/poll_votes.dart'; import 'package:stream_chat_persistence/src/entity/polls.dart'; import 'package:stream_chat_persistence/src/entity/users.dart'; @@ -51,6 +52,32 @@ class PollDao extends DatabaseAccessor with _$PollDaoMixin { .map(_pollFromJoinRow) .getSingleOrNull(); + /// Returns polls for every id in [pollIds], keyed by poll id. + Future> getPollsByIds(List pollIds) async { + if (pollIds.isEmpty) return const {}; + + // Group votes once for every requested poll. The dense-map contract on + // `getPollVotesForPolls` means every input id resolves to a (possibly + // empty) list with a single lookup. + final votesByPoll = await _db.pollVoteDao.getPollVotesForPolls(pollIds); + + final result = {for (final id in pollIds) id: null}; + for (final chunk in chunked(pollIds)) { + final where = polls.id.isIn(chunk); + final rows = await (select(polls)..where((_) => where)).join( + [leftOuterJoin(users, polls.createdById.equalsExp(users.id))]).get(); + for (final row in rows) { + final pollEntity = row.readTable(polls); + // Same as `_pollFromJoinRow` => reads users via `readTable` (not + // `readTableOrNull`) on a LEFT JOIN + final userEntity = row.readTable(users); + final allVotes = votesByPoll[pollEntity.id] ?? const []; + result[pollEntity.id] = _buildPoll(pollEntity, userEntity, allVotes); + } + } + return result; + } + /// Updates all the polls using the new [pollList] data Future updatePolls(List pollList) => batch( (it) => it.insertAllOnConflictUpdate( @@ -69,4 +96,32 @@ class PollDao extends DatabaseAccessor with _$PollDaoMixin { /// Deletes all the polls whose [Polls.id] is present in [pollIds] Future deletePollsByIds(List pollIds) => (delete(polls)..where((tbl) => tbl.id.isIn(pollIds))).go(); + + Poll _buildPoll( + PollEntity pollEntity, + UserEntity userEntity, + List allVotes, + ) { + final latestAnswers = allVotes.where((it) => it.isAnswer); + final ownVotesAndAnswers = allVotes.where((it) => it.userId == _db.userId); + + final latestVotesByOption = >{}; + for (final vote in allVotes) { + if (vote.isAnswer) continue; + if (vote.optionId case final optionId?) { + latestVotesByOption.update( + optionId, + (value) => [...value, vote], + ifAbsent: () => [vote], + ); + } + } + + return pollEntity.toPoll( + createdBy: userEntity.toUser(), + latestAnswers: latestAnswers.toList(), + ownVotesAndAnswers: ownVotesAndAnswers.toList(), + latestVotesByOption: latestVotesByOption, + ); + } } diff --git a/packages/stream_chat_persistence/lib/src/dao/poll_vote_dao.dart b/packages/stream_chat_persistence/lib/src/dao/poll_vote_dao.dart index 55884f4a7..571ecfa56 100644 --- a/packages/stream_chat_persistence/lib/src/dao/poll_vote_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/poll_vote_dao.dart @@ -1,6 +1,7 @@ import 'package:drift/drift.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; +import 'package:stream_chat_persistence/src/db/query_utils.dart'; import 'package:stream_chat_persistence/src/entity/poll_votes.dart'; import 'package:stream_chat_persistence/src/entity/polls.dart'; import 'package:stream_chat_persistence/src/entity/users.dart'; @@ -30,6 +31,33 @@ class PollVoteDao extends DatabaseAccessor return pollVoteEntity.toPollVote(user: userEntity?.toUser()); }).get(); + /// Returns poll votes for every id in [pollIds], grouped by poll id. + Future>> getPollVotesForPolls( + List pollIds, + ) async { + if (pollIds.isEmpty) return const {}; + final grouped = >{ + for (final id in pollIds) id: [], + }; + for (final chunk in chunked(pollIds)) { + final where = pollVotes.pollId.isIn(chunk); + final rows = await (select(pollVotes).join([ + leftOuterJoin(users, pollVotes.userId.equalsExp(users.id)), + ]) + ..where(where) + ..orderBy([OrderingTerm.asc(pollVotes.createdAt)])) + .map((row) { + final userEntity = row.readTableOrNull(users); + final pollVoteEntity = row.readTable(pollVotes); + return pollVoteEntity.toPollVote(user: userEntity?.toUser()); + }).get(); + for (final v in rows) { + grouped[v.pollId]!.add(v); + } + } + return grouped; + } + /// Updates the poll votes data with the new [pollVoteList] data Future updatePollVotes(List pollVoteList) => batch( (it) => it.insertAllOnConflictUpdate( diff --git a/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart b/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart index d6dae9bd9..43c41a8be 100644 --- a/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart @@ -1,6 +1,7 @@ import 'package:drift/drift.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:stream_chat_persistence/src/db/drift_chat_database.dart'; +import 'package:stream_chat_persistence/src/db/query_utils.dart'; import 'package:stream_chat_persistence/src/entity/reactions.dart'; import 'package:stream_chat_persistence/src/entity/users.dart'; import 'package:stream_chat_persistence/src/mapper/mapper.dart'; @@ -15,29 +16,69 @@ class ReactionDao extends DatabaseAccessor ReactionDao(super.db); /// Returns all the reactions of a particular message by matching - /// [Reactions.messageId] with [messageId] - Future> getReactions(String messageId) => - (select(reactions).join([ - leftOuterJoin(users, reactions.userId.equalsExp(users.id)), - ]) - ..where(reactions.messageId.equals(messageId)) - ..orderBy([OrderingTerm.asc(reactions.createdAt)])) - .map((rows) { - final userEntity = rows.readTableOrNull(users); - final reactionEntity = rows.readTable(reactions); - return reactionEntity.toReaction(user: userEntity?.toUser()); - }).get(); + /// [Reactions.messageId] with [messageId]. + /// + /// Not used in production — `MessageDao` hydrates via the batched + /// [getReactionsForMessages]. Kept for convenience (tests + ad-hoc + /// single-id lookups). + Future> getReactions(String messageId) { + final where = reactions.messageId.equals(messageId); + return _selectReactions(where); + } /// Returns all the reactions of a particular message /// added by a particular user by matching /// [Reactions.messageId] with [messageId] and - /// [Reactions.userId] with [userId] + /// [Reactions.userId] with [userId]. + /// + /// Not used in production — `MessageDao` hydrates via the batched + /// [getReactionsForMessagesByUserId]. Kept for convenience. Future> getReactionsByUserId( String messageId, String userId, + ) { + final where = + reactions.messageId.equals(messageId) & reactions.userId.equals(userId); + return _selectReactions(where); + } + + /// Returns reactions for every id in [messageIds], grouped by message id. + Future>> getReactionsForMessages( + List messageIds, ) async { - final reactions = await getReactions(messageId); - return reactions.where((it) => it.userId == userId).toList(); + if (messageIds.isEmpty) return const {}; + final grouped = >{ + for (final id in messageIds) id: [], + }; + for (final chunk in chunked(messageIds)) { + final where = reactions.messageId.isIn(chunk); + final rows = await _selectReactions(where); + for (final r in rows) { + grouped[r.messageId]!.add(r); + } + } + return grouped; + } + + /// Returns reactions for every id in [messageIds] that were added by + /// [userId], grouped by message id. + Future>> getReactionsForMessagesByUserId( + List messageIds, + String userId, + ) async { + if (messageIds.isEmpty) return const {}; + final grouped = >{ + for (final id in messageIds) id: [], + }; + for (final chunk in chunked(messageIds)) { + final where = + reactions.messageId.isIn(chunk) & reactions.userId.equals(userId); + final rows = await _selectReactions(where); + for (final r in rows) { + grouped[r.messageId]!.add(r); + } + } + return grouped; } /// Updates the reactions data with the new [reactionList] data @@ -57,4 +98,16 @@ class ReactionDao extends DatabaseAccessor (r) => r.messageId.isIn(messageIds), ); }); + + Future> _selectReactions(Expression where) { + final rows = select(reactions) + .join([leftOuterJoin(users, reactions.userId.equalsExp(users.id))]) + ..where(where) + ..orderBy([OrderingTerm.asc(reactions.createdAt)]); + return rows.map((row) { + final reactionEntity = row.readTable(reactions); + final userEntity = row.readTableOrNull(users); + return reactionEntity.toReaction(user: userEntity?.toUser()); + }).get(); + } } diff --git a/packages/stream_chat_persistence/lib/src/db/query_utils.dart b/packages/stream_chat_persistence/lib/src/db/query_utils.dart new file mode 100644 index 000000000..1a2f96659 --- /dev/null +++ b/packages/stream_chat_persistence/lib/src/db/query_utils.dart @@ -0,0 +1,18 @@ +import 'dart:math' as math; + +/// Splits a list of bind parameters into chunks safe for a single +/// `WHERE col IN (?, ?, ...)` SQLite statement. +/// +/// SQLite enforces `SQLITE_MAX_VARIABLE_NUMBER` (default 999 on the Android +/// build shipped via `sqlite3_flutter_libs`) on the number of `?` placeholders +/// per statement. Each id in the list becomes one placeholder, so an +/// unbounded id list would fail the query outright with +/// `too many SQL variables`. Callers that batch-load related rows by id +/// must run one SELECT per chunk and merge the results in Dart. The default +/// chunk size of 900 leaves headroom for any other bound parameters that +/// share the same statement (for example a `AND userId = ?` filter). +Iterable> chunked(List input, [int size = 900]) sync* { + for (var i = 0; i < input.length; i += size) { + yield input.sublist(i, math.min(i + size, input.length)); + } +} diff --git a/packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart index 15c8e48b2..871521472 100644 --- a/packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart @@ -378,6 +378,50 @@ void main() { }); }); + test( + 'getDraftMessageByCid hydrates parent message with draft=null ' + '(propagates fetchDraft=false to prevent recursion)', () async { + const cid = 'test:fetchDraftPropagation'; + const parentId = 'parent-msg'; + + final user = User(id: 'testUserId'); + final parentMessage = Message( + id: parentId, + user: user, + createdAt: DateTime.now(), + text: 'Parent', + ); + + await database.userDao.updateUsers([user]); + await database.channelDao.updateChannels([ChannelModel(cid: cid)]); + await database.messageDao.updateMessages(cid, [parentMessage]); + + await draftMessageDao.updateDraftMessages([ + Draft( + channelCid: cid, + parentId: parentId, + createdAt: DateTime.now(), + message: DraftMessage( + id: 'thread-draft', + text: 'reply draft', + parentId: parentId, + ), + ), + ]); + + final fetched = + await draftMessageDao.getDraftMessageByCid(cid, parentId: parentId); + + expect(fetched, isNotNull); + expect(fetched!.parentMessage, isNotNull); + expect(fetched.parentMessage!.id, parentId); + expect( + fetched.parentMessage!.draft, + isNull, + reason: 'parent message hydration must pass fetchDraft=false', + ); + }); + group('DraftMessages entity references', () { test( 'should delete draft messages when referenced channel is deleted', @@ -505,4 +549,101 @@ void main() { }, ); }); + + group('getDraftMessagesByParentIds', () { + test('returns empty map for empty input ids', () async { + final result = await draftMessageDao + .getDraftMessagesByParentIds('any-cid', const []); + expect(result, isEmpty); + }); + + test( + 'returns the thread draft per parent id within the given channel; ' + 'parents without a thread draft (or drafts in other channels) map ' + 'to null', () async { + const cidA = 'test:cidA'; + const cidB = 'test:cidB'; + const parentWithDraft = 'parent-with-draft'; + const parentWithoutDraft = 'parent-without-draft'; + const parentInOtherChannel = 'parent-in-other-channel'; + const parentUnknown = 'parent-unknown'; + + final user = User(id: 'testUserId'); + await database.userDao.updateUsers([user]); + await database.channelDao.updateChannels([ + ChannelModel(cid: cidA), + ChannelModel(cid: cidB), + ]); + await database.messageDao.updateMessages(cidA, [ + Message( + id: parentWithDraft, + user: user, + createdAt: DateTime.now(), + text: 'A', + ), + Message( + id: parentWithoutDraft, + user: user, + createdAt: DateTime.now(), + text: 'B', + ), + ]); + await database.messageDao.updateMessages(cidB, [ + Message( + id: parentInOtherChannel, + user: user, + createdAt: DateTime.now(), + text: 'C', + ), + ]); + // One draft in cidA on parentWithDraft, and one draft in cidB to + // confirm the cid filter excludes it from the cidA lookup. + await draftMessageDao.updateDraftMessages([ + Draft( + channelCid: cidA, + parentId: parentWithDraft, + createdAt: DateTime.now(), + message: DraftMessage( + text: 'draft in cidA', + parentId: parentWithDraft, + ), + ), + Draft( + channelCid: cidB, + parentId: parentInOtherChannel, + createdAt: DateTime.now(), + message: DraftMessage( + text: 'draft in cidB', + parentId: parentInOtherChannel, + ), + ), + ]); + + final result = await draftMessageDao.getDraftMessagesByParentIds( + cidA, + const [ + parentWithDraft, + parentWithoutDraft, + parentInOtherChannel, + parentUnknown, + ], + ); + + expect( + result.keys, + unorderedEquals([ + parentWithDraft, + parentWithoutDraft, + parentInOtherChannel, + parentUnknown, + ]), + ); + expect(result[parentWithDraft], isNotNull); + expect(result[parentWithDraft]!.parentId, parentWithDraft); + expect(result[parentWithDraft]!.message.text, 'draft in cidA'); + expect(result[parentWithoutDraft], isNull); + expect(result[parentInOtherChannel], isNull); + expect(result[parentUnknown], isNull); + }); + }); } diff --git a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart index 29cda8c93..b75e5443b 100644 --- a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart @@ -25,6 +25,12 @@ void main() { }) async { final channels = [ChannelModel(cid: cid)]; final users = List.generate(count, (index) => User(id: 'testUserId$index')); + // Strictly monotonic `createdAt` per message so SQL-side pagination + // filters (`WHERE createdAt < cutoff`, `ORDER BY createdAt ASC`) can't be + // confused by ties. Drift stores `DateTime` as integer Unix seconds by + // default, so the offset must be at least 1 second per row — otherwise + // sub-second offsets all round-trip onto the same second. + final baseTime = DateTime.now(); final messages = List.generate( count, (index) => Message( @@ -32,7 +38,7 @@ void main() { type: 'testType', user: users[index], channelRole: 'channel_member', - createdAt: DateTime.now(), + createdAt: baseTime.add(Duration(seconds: index)), shadowed: math.Random().nextBool(), replyCount: index, updatedAt: DateTime.now(), @@ -59,7 +65,7 @@ void main() { type: 'testType', user: users[index], channelRole: 'channel_member', - createdAt: DateTime.now(), + createdAt: baseTime.add(Duration(seconds: index)), shadowed: math.Random().nextBool(), replyCount: index, updatedAt: DateTime.now(), @@ -85,7 +91,7 @@ void main() { channelRole: 'channel_member', parentId: mapAllThreadToFirstMessage ? messages[0].id : messages[index].id, - createdAt: DateTime.now(), + createdAt: baseTime.add(Duration(seconds: index)), shadowed: math.Random().nextBool(), replyCount: index, updatedAt: DateTime.now(), @@ -294,7 +300,7 @@ void main() { const options = PaginationParams( limit: 15, lessThan: 'testThreadMessageId${cid}25', - greaterThanOrEqual: 'testThreadMessageId${cid}5', + greaterThan: 'testThreadMessageId${cid}5', ); // Messages should be empty initially @@ -320,6 +326,8 @@ void main() { ); expect(threadMessages.length, 15); expect(threadMessages.first.parentId, parentId); + expect(threadMessages.first.id, 'testThreadMessageId${cid}5'); + expect(threadMessages.last.id, 'testThreadMessageId${cid}19'); }); test('getMessagesByCid', () async { @@ -365,11 +373,11 @@ void main() { const cid = 'test:Cid'; const limit = 15; const lessThan = 'testMessageId${cid}25'; - const greaterThanOrEqual = 'testMessageId${cid}5'; + const greaterThan = 'testMessageId${cid}5'; const pagination = PaginationParams( limit: limit, lessThan: lessThan, - greaterThanOrEqual: greaterThanOrEqual, + greaterThan: greaterThan, ); // Should be empty initially @@ -389,8 +397,294 @@ void main() { messagePagination: pagination, ); expect(fetchedMessages.length, limit); + expect(fetchedMessages.first.id, 'testMessageId${cid}10'); expect(fetchedMessages.last.id, 'testMessageId${cid}24'); - expect(fetchedMessages.first.id != lessThan, true); + }); + + group('getMessagesByCid pagination', () { + const cid = 'test:Cid'; + + test('lessThan only trims messages from the end', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 25); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}24'); + }); + + test('greaterThan only trims messages from the start (exclusive)', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThan: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 24); + expect(fetchedMessages.first.id, 'testMessageId${cid}6'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('limit only keeps the last N messages', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams(limit: 15), + ); + + expect(fetchedMessages.length, 15); + expect(fetchedMessages.first.id, 'testMessageId${cid}15'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('lessThan id not in result set is a no-op', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'missing-id', + ), + ); + + expect(fetchedMessages.length, 30); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('greaterThan id not in result set is a no-op', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThan: 'missing-id', + ), + ); + + expect(fetchedMessages.length, 30); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('thread-reply id as cursor is a no-op (not visible in channel)', + () async { + // `_prepareTestData` inserts thread replies with `parentId` set and + // `showInChannel` left null — i.e. not visible in the channel query. + // Passing such an id as a cursor must resolve to a no-op so the main + // query falls back to returning the full channel slice. + await _prepareTestData(cid, count: 30, threads: true); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'testThreadMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 30); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('default PaginationParams() applies implicit limit of 10', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams(), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}20'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('default limit + lessThan returns last 10 of filtered set', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + lessThan: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}15'); + expect(fetchedMessages.last.id, 'testMessageId${cid}24'); + }); + + test('default limit + greaterThan returns first 10 after the pivot', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + greaterThan: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}6'); + expect(fetchedMessages.last.id, 'testMessageId${cid}15'); + }); + + test('lessThanOrEqual is inclusive of the pivot', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThanOrEqual: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 26); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}25'); + }); + + test('greaterThanOrEqual is inclusive of the pivot', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThanOrEqual: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 25); + expect(fetchedMessages.first.id, 'testMessageId${cid}5'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('default limit + lessThanOrEqual returns the pivot and 9 before', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + lessThanOrEqual: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}16'); + expect(fetchedMessages.last.id, 'testMessageId${cid}25'); + }); + + test('default limit + greaterThanOrEqual returns the pivot and 9 after', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + greaterThanOrEqual: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}5'); + expect(fetchedMessages.last.id, 'testMessageId${cid}14'); + }); + + test('cursor with tied createdAt does not skip or duplicate siblings', + () async { + // Three messages share an identical `createdAt`. The SQL ORDER BY uses + // the `(createdAt, id)` tuple, so within the trio the relative order is + // by id (lexicographic). A cursor at `msg_tieB` must split the trio + // cleanly: `msg_tieA` lands on the "before" side, `msg_tieC` on the + // "after" side. A `createdAt`-only WHERE predicate would collapse all + // three into the cursor's bucket and drop or keep them together. + final users = [User(id: 'tieUser')]; + await database.userDao.updateUsers(users); + await database.channelDao.updateChannels([ChannelModel(cid: cid)]); + + final tie = DateTime.now(); + final earlier = tie.subtract(const Duration(seconds: 1)); + final later = tie.add(const Duration(seconds: 1)); + + Message m(String id, DateTime t) => Message( + id: id, + user: users.first, + createdAt: t, + updatedAt: t, + text: id, + ); + + await messageDao.updateMessages(cid, [ + m('msg_pre', earlier), + m('msg_tieA', tie), + m('msg_tieB', tie), + m('msg_tieC', tie), + m('msg_post', later), + ]); + + final before = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'msg_tieB', + ), + ); + expect(before.map((m) => m.id).toList(), ['msg_pre', 'msg_tieA']); + + final after = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThan: 'msg_tieB', + ), + ); + expect(after.map((m) => m.id).toList(), ['msg_tieC', 'msg_post']); + + final atOrBefore = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThanOrEqual: 'msg_tieB', + ), + ); + expect( + atOrBefore.map((m) => m.id).toList(), + ['msg_pre', 'msg_tieA', 'msg_tieB'], + ); + + final atOrAfter = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThanOrEqual: 'msg_tieB', + ), + ); + expect( + atOrAfter.map((m) => m.id).toList(), + ['msg_tieB', 'msg_tieC', 'msg_post'], + ); + }); }); test('updateMessages', () async { @@ -435,6 +729,401 @@ void main() { ); }); + // Covers message enrichment logic + group('hydration', () { + const cid = 'test:Hydration'; + + Future _seedChannel(String channelCid) async { + await database.channelDao.updateChannels([ChannelModel(cid: channelCid)]); + } + + test('getMessageById hydrates multiple latest and own reactions', () async { + const messageId = 'msg-multi-reactions'; + await _seedChannel(cid); + + final dbUser = User(id: 'testUserId'); + final otherUser = User(id: 'otherUser'); + await database.userDao.updateUsers([dbUser, otherUser]); + + await messageDao.updateMessages(cid, [ + Message( + id: messageId, + user: dbUser, + text: 'Hello', + createdAt: DateTime.now(), + ), + ]); + + // 2 reactions by the DB user, 1 by another user. + await database.reactionDao.updateReactions([ + Reaction( + type: 'like', + messageId: messageId, + user: dbUser, + createdAt: DateTime.now(), + ), + Reaction( + type: 'love', + messageId: messageId, + user: dbUser, + createdAt: DateTime.now().add(const Duration(seconds: 1)), + ), + Reaction( + type: 'wow', + messageId: messageId, + user: otherUser, + createdAt: DateTime.now().add(const Duration(seconds: 2)), + ), + ]); + + final fetched = await messageDao.getMessageById(messageId); + expect(fetched, isNotNull); + expect(fetched!.latestReactions, hasLength(3)); + expect(fetched.ownReactions, hasLength(2)); + expect( + fetched.ownReactions!.every((r) => r.user?.id == dbUser.id), + isTrue, + ); + expect( + fetched.latestReactions!.map((r) => r.type).toSet(), + equals({'like', 'love', 'wow'}), + ); + }); + + test('getMessagesByCid hydrates reactions per row independently', () async { + await _seedChannel(cid); + + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + final baseTime = DateTime.now(); + final messages = List.generate( + 5, + (i) => Message( + id: 'msg-iso-$i', + user: dbUser, + text: 'Hello $i', + createdAt: baseTime.add(Duration(seconds: i)), + ), + ); + await messageDao.updateMessages(cid, messages); + + // 2 reactions per message, distinct types per-row. + final reactions = [ + for (var i = 0; i < messages.length; i++) ...[ + Reaction( + type: 'like-$i', + messageId: messages[i].id, + user: dbUser, + createdAt: baseTime.add(Duration(seconds: i)), + ), + Reaction( + type: 'love-$i', + messageId: messages[i].id, + user: dbUser, + createdAt: baseTime.add(Duration(seconds: i, milliseconds: 1)), + ), + ], + ]; + await database.reactionDao.updateReactions(reactions); + + final fetched = await messageDao.getMessagesByCid(cid); + expect(fetched, hasLength(5)); + for (final m in fetched) { + expect(m.latestReactions, hasLength(2)); + // Reactions must belong to this message only — no cross-contamination + // between rows when batched-hydration replaces the per-row queries. + expect( + m.latestReactions!.map((r) => r.type).toSet(), + equals( + {'like-${m.id.split('-').last}', 'love-${m.id.split('-').last}'}), + ); + } + }); + + test('getMessagesByCid hydrates poll with own + other-user votes', + () async { + const messageId = 'msg-with-poll'; + const pollId = 'poll-mixed'; + await _seedChannel(cid); + + final dbUser = User(id: 'testUserId'); + final otherUser = User(id: 'otherUser'); + await database.userDao.updateUsers([dbUser, otherUser]); + + const optionA = PollOption(id: 'opt-a', text: 'A'); + const optionB = PollOption(id: 'opt-b', text: 'B'); + + final poll = Poll( + id: pollId, + name: 'Pick one', + options: const [optionA, optionB], + createdBy: dbUser, + createdById: dbUser.id, + ); + await database.pollDao.updatePolls([poll]); + + await messageDao.updateMessages(cid, [ + Message( + id: messageId, + user: dbUser, + text: 'Vote please', + createdAt: DateTime.now(), + pollId: pollId, + ), + ]); + + // 2 own votes (one per option), 2 other-user votes, 1 own answer. + await database.pollVoteDao.updatePollVotes([ + PollVote( + id: 'v1', + pollId: pollId, + userId: dbUser.id, + user: dbUser, + optionId: optionA.id, + createdAt: DateTime.now(), + ), + PollVote( + id: 'v2', + pollId: pollId, + userId: dbUser.id, + user: dbUser, + optionId: optionB.id, + createdAt: DateTime.now().add(const Duration(seconds: 1)), + ), + PollVote( + id: 'v3', + pollId: pollId, + userId: otherUser.id, + user: otherUser, + optionId: optionA.id, + createdAt: DateTime.now().add(const Duration(seconds: 2)), + ), + PollVote( + id: 'v4', + pollId: pollId, + userId: otherUser.id, + user: otherUser, + optionId: optionB.id, + createdAt: DateTime.now().add(const Duration(seconds: 3)), + ), + PollVote( + id: 'a1', + pollId: pollId, + userId: dbUser.id, + user: dbUser, + answerText: 'because', + createdAt: DateTime.now().add(const Duration(seconds: 4)), + ), + ]); + + final fetched = await messageDao.getMessagesByCid(cid); + expect(fetched, hasLength(1)); + final hydratedPoll = fetched.first.poll; + expect(hydratedPoll, isNotNull); + expect(hydratedPoll!.id, pollId); + expect(hydratedPoll.latestVotesByOption[optionA.id], hasLength(2)); + expect(hydratedPoll.latestVotesByOption[optionB.id], hasLength(2)); + expect(hydratedPoll.latestAnswers, hasLength(1)); + // Own votes + own answer => 3 total in ownVotesAndAnswers. + expect(hydratedPoll.ownVotesAndAnswers, hasLength(3)); + expect( + hydratedPoll.ownVotesAndAnswers.every((v) => v.userId == dbUser.id), + isTrue, + ); + }); + + test( + 'getMessagesByCid hydrates thread draft when fetchDraft=true; ' + 'null when false', () async { + // `fetchDraft` attaches a thread draft (parentId == message.id) to its + // parent message, not the channel-level draft. See changelog entry for + // 9.15.0. + await _seedChannel(cid); + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + const parentId = 'msg-with-draft'; + await messageDao.updateMessages(cid, [ + Message( + id: parentId, + user: dbUser, + text: 'msg', + createdAt: DateTime.now(), + ), + ]); + + await database.draftMessageDao.updateDraftMessages([ + Draft( + channelCid: cid, + parentId: parentId, + createdAt: DateTime.now(), + message: DraftMessage( + id: 'draft-0', + text: 'unsent', + parentId: parentId, + ), + ), + ]); + + final withDraft = await messageDao.getMessagesByCid(cid); + expect(withDraft.first.draft, isNotNull); + expect(withDraft.first.draft!.message.text, 'unsent'); + expect(withDraft.first.draft!.parentId, parentId); + + final withoutDraft = + await messageDao.getMessagesByCid(cid, fetchDraft: false); + expect(withoutDraft.first.draft, isNull); + }); + + test( + 'getMessagesByCid hydrates quoted message with its own reactions ' + 'and poll', () async { + await _seedChannel(cid); + + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + const pollId = 'poll-on-quoted'; + const quotedMessageId = 'msg-quoted'; + const quotingMessageId = 'msg-quoting'; + + final poll = Poll( + id: pollId, + name: 'Quoted poll', + options: const [ + PollOption(id: 'q-opt-a', text: 'A'), + PollOption(id: 'q-opt-b', text: 'B'), + ], + createdBy: dbUser, + createdById: dbUser.id, + ); + await database.pollDao.updatePolls([poll]); + + final baseTime = DateTime.now(); + await messageDao.updateMessages(cid, [ + Message( + id: quotedMessageId, + user: dbUser, + text: 'first', + createdAt: baseTime, + pollId: pollId, + ), + Message( + id: quotingMessageId, + user: dbUser, + text: 'second', + createdAt: baseTime.add(const Duration(seconds: 1)), + quotedMessageId: quotedMessageId, + ), + ]); + + await database.reactionDao.updateReactions([ + Reaction( + type: 'like', + messageId: quotedMessageId, + user: dbUser, + createdAt: baseTime, + ), + ]); + + final fetched = await messageDao.getMessagesByCid(cid); + final quoting = fetched.firstWhere((m) => m.id == quotingMessageId); + expect(quoting.quotedMessage, isNotNull); + expect(quoting.quotedMessage!.id, quotedMessageId); + expect(quoting.quotedMessage!.latestReactions, hasLength(1)); + expect(quoting.quotedMessage!.ownReactions, hasLength(1)); + expect(quoting.quotedMessage!.poll, isNotNull); + expect(quoting.quotedMessage!.poll!.id, pollId); + }); + + test('getMessagesByCid resolves a depth-2 quote chain', () async { + await _seedChannel(cid); + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + final baseTime = DateTime.now(); + await messageDao.updateMessages(cid, [ + Message( + id: 'C', + user: dbUser, + text: 'root', + createdAt: baseTime, + ), + Message( + id: 'B', + user: dbUser, + text: 'mid', + createdAt: baseTime.add(const Duration(seconds: 1)), + quotedMessageId: 'C', + ), + Message( + id: 'A', + user: dbUser, + text: 'top', + createdAt: baseTime.add(const Duration(seconds: 2)), + quotedMessageId: 'B', + ), + ]); + + final fetched = await messageDao.getMessagesByCid(cid); + final top = fetched.firstWhere((m) => m.id == 'A'); + expect(top.quotedMessage?.id, 'B'); + expect(top.quotedMessage?.quotedMessage?.id, 'C'); + }); + + test('getMessagesByCid hydrates reactions under pagination', () async { + await _seedChannel(cid); + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + final baseTime = DateTime.now(); + final messages = List.generate( + 30, + (i) => Message( + id: 'p-msg-$i', + user: dbUser, + text: 'msg $i', + createdAt: baseTime.add(Duration(seconds: i)), + ), + ); + await messageDao.updateMessages(cid, messages); + + // 2 reactions per message; surviving rows after pagination must still + // carry their full reaction set. + final reactions = [ + for (final m in messages) ...[ + Reaction( + type: 'r1', + messageId: m.id, + user: dbUser, + createdAt: m.createdAt, + ), + Reaction( + type: 'r2', + messageId: m.id, + user: dbUser, + createdAt: m.createdAt.add(const Duration(milliseconds: 1)), + ), + ], + ]; + await database.reactionDao.updateReactions(reactions); + + final page = await messageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + lessThan: 'p-msg-25', + ), + ); + expect(page, hasLength(10)); + for (final m in page) { + expect(m.latestReactions, hasLength(2)); + expect(m.ownReactions, hasLength(2)); + expect(m.latestReactions!.every((r) => r.messageId == m.id), isTrue); + } + }); + }); + tearDown(() async { await database.disconnect(); }); diff --git a/packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart index 0bd5166e5..11f324286 100644 --- a/packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart @@ -426,6 +426,322 @@ void main() { ); }); + // Mirror of the `message_dao_test.dart` "hydration" group, scoped to the + // pinned-messages table + `pinnedMessageReactionDao`. Locks per-row + // hydration before the upcoming batched-hydration refactor. + group('hydration', () { + const cid = 'test:PinnedHydration'; + + Future _seedChannel(String channelCid) async { + await database.channelDao.updateChannels([ChannelModel(cid: channelCid)]); + } + + test('getMessageById hydrates multiple latest and own reactions', () async { + const messageId = 'pmsg-multi-reactions'; + await _seedChannel(cid); + + final dbUser = User(id: 'testUserId'); + final otherUser = User(id: 'otherUser'); + await database.userDao.updateUsers([dbUser, otherUser]); + + await pinnedMessageDao.updateMessages(cid, [ + Message( + id: messageId, + user: dbUser, + text: 'Hello', + createdAt: DateTime.now(), + ), + ]); + + await database.pinnedMessageReactionDao.updateReactions([ + Reaction( + type: 'like', + messageId: messageId, + user: dbUser, + createdAt: DateTime.now(), + ), + Reaction( + type: 'love', + messageId: messageId, + user: dbUser, + createdAt: DateTime.now().add(const Duration(seconds: 1)), + ), + Reaction( + type: 'wow', + messageId: messageId, + user: otherUser, + createdAt: DateTime.now().add(const Duration(seconds: 2)), + ), + ]); + + final fetched = await pinnedMessageDao.getMessageById(messageId); + expect(fetched, isNotNull); + expect(fetched!.latestReactions, hasLength(3)); + expect(fetched.ownReactions, hasLength(2)); + expect( + fetched.ownReactions!.every((r) => r.user?.id == dbUser.id), + isTrue, + ); + }); + + test('getMessagesByCid hydrates reactions per row independently', () async { + await _seedChannel(cid); + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + final baseTime = DateTime.now(); + final messages = List.generate( + 5, + (i) => Message( + id: 'pmsg-iso-$i', + user: dbUser, + text: 'Hello $i', + createdAt: baseTime.add(Duration(seconds: i)), + ), + ); + await pinnedMessageDao.updateMessages(cid, messages); + + final reactions = [ + for (var i = 0; i < messages.length; i++) ...[ + Reaction( + type: 'like-$i', + messageId: messages[i].id, + user: dbUser, + createdAt: baseTime.add(Duration(seconds: i)), + ), + Reaction( + type: 'love-$i', + messageId: messages[i].id, + user: dbUser, + createdAt: baseTime.add(Duration(seconds: i, milliseconds: 1)), + ), + ], + ]; + await database.pinnedMessageReactionDao.updateReactions(reactions); + + final fetched = await pinnedMessageDao.getMessagesByCid(cid); + expect(fetched, hasLength(5)); + for (final m in fetched) { + expect(m.latestReactions, hasLength(2)); + expect( + m.latestReactions!.map((r) => r.type).toSet(), + equals({ + 'like-${m.id.split('-').last}', + 'love-${m.id.split('-').last}', + }), + ); + } + }); + + test('getMessagesByCid hydrates poll with own + other-user votes', + () async { + const messageId = 'pmsg-with-poll'; + const pollId = 'ppoll-mixed'; + await _seedChannel(cid); + + final dbUser = User(id: 'testUserId'); + final otherUser = User(id: 'otherUser'); + await database.userDao.updateUsers([dbUser, otherUser]); + + const optionA = PollOption(id: 'p-opt-a', text: 'A'); + const optionB = PollOption(id: 'p-opt-b', text: 'B'); + + await database.pollDao.updatePolls([ + Poll( + id: pollId, + name: 'Pick one', + options: const [optionA, optionB], + createdBy: dbUser, + createdById: dbUser.id, + ), + ]); + + await pinnedMessageDao.updateMessages(cid, [ + Message( + id: messageId, + user: dbUser, + text: 'Vote please', + createdAt: DateTime.now(), + pollId: pollId, + ), + ]); + + await database.pollVoteDao.updatePollVotes([ + PollVote( + id: 'pv1', + pollId: pollId, + userId: dbUser.id, + user: dbUser, + optionId: optionA.id, + createdAt: DateTime.now(), + ), + PollVote( + id: 'pv2', + pollId: pollId, + userId: otherUser.id, + user: otherUser, + optionId: optionB.id, + createdAt: DateTime.now().add(const Duration(seconds: 1)), + ), + PollVote( + id: 'pa1', + pollId: pollId, + userId: dbUser.id, + user: dbUser, + answerText: 'because', + createdAt: DateTime.now().add(const Duration(seconds: 2)), + ), + ]); + + final fetched = await pinnedMessageDao.getMessagesByCid(cid); + expect(fetched, hasLength(1)); + final hydratedPoll = fetched.first.poll; + expect(hydratedPoll, isNotNull); + expect(hydratedPoll!.id, pollId); + expect(hydratedPoll.latestAnswers, hasLength(1)); + // 1 own vote + 1 own answer = 2. + expect(hydratedPoll.ownVotesAndAnswers, hasLength(2)); + }); + + test( + 'getMessagesByCid hydrates thread draft when fetchDraft=true; ' + 'null when false', () async { + await _seedChannel(cid); + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + const parentId = 'pmsg-with-draft'; + final parentMessage = Message( + id: parentId, + user: dbUser, + text: 'msg', + createdAt: DateTime.now(), + ); + // Pin the message and ALSO insert it into the main `messages` table: + // `DraftMessages.parentId` is FK-referenced against `Messages.id`, not + // `PinnedMessages.id`, so a thread draft needs the row in both places. + await pinnedMessageDao.updateMessages(cid, [parentMessage]); + await database.messageDao.updateMessages(cid, [parentMessage]); + + await database.draftMessageDao.updateDraftMessages([ + Draft( + channelCid: cid, + parentId: parentId, + createdAt: DateTime.now(), + message: DraftMessage( + id: 'pdraft-0', + text: 'unsent', + parentId: parentId, + ), + ), + ]); + + final withDraft = await pinnedMessageDao.getMessagesByCid(cid); + expect(withDraft.first.draft, isNotNull); + expect(withDraft.first.draft!.parentId, parentId); + + final withoutDraft = + await pinnedMessageDao.getMessagesByCid(cid, fetchDraft: false); + expect(withoutDraft.first.draft, isNull); + }); + + test( + 'getMessagesByCid hydrates quoted pinned message with its own ' + 'reactions and poll', () async { + await _seedChannel(cid); + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + const pollId = 'ppoll-on-quoted'; + const quotedMessageId = 'pmsg-quoted'; + const quotingMessageId = 'pmsg-quoting'; + + await database.pollDao.updatePolls([ + Poll( + id: pollId, + name: 'Quoted poll', + options: const [ + PollOption(id: 'pq-opt-a', text: 'A'), + PollOption(id: 'pq-opt-b', text: 'B'), + ], + createdBy: dbUser, + createdById: dbUser.id, + ), + ]); + + final baseTime = DateTime.now(); + await pinnedMessageDao.updateMessages(cid, [ + Message( + id: quotedMessageId, + user: dbUser, + text: 'first', + createdAt: baseTime, + pollId: pollId, + ), + Message( + id: quotingMessageId, + user: dbUser, + text: 'second', + createdAt: baseTime.add(const Duration(seconds: 1)), + quotedMessageId: quotedMessageId, + ), + ]); + + await database.pinnedMessageReactionDao.updateReactions([ + Reaction( + type: 'like', + messageId: quotedMessageId, + user: dbUser, + createdAt: baseTime, + ), + ]); + + final fetched = await pinnedMessageDao.getMessagesByCid(cid); + final quoting = fetched.firstWhere((m) => m.id == quotingMessageId); + expect(quoting.quotedMessage, isNotNull); + expect(quoting.quotedMessage!.id, quotedMessageId); + expect(quoting.quotedMessage!.latestReactions, hasLength(1)); + expect(quoting.quotedMessage!.ownReactions, hasLength(1)); + expect(quoting.quotedMessage!.poll, isNotNull); + expect(quoting.quotedMessage!.poll!.id, pollId); + }); + + test('getMessagesByCid resolves a depth-2 quote chain', () async { + await _seedChannel(cid); + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + final baseTime = DateTime.now(); + await pinnedMessageDao.updateMessages(cid, [ + Message( + id: 'pC', + user: dbUser, + text: 'root', + createdAt: baseTime, + ), + Message( + id: 'pB', + user: dbUser, + text: 'mid', + createdAt: baseTime.add(const Duration(seconds: 1)), + quotedMessageId: 'pC', + ), + Message( + id: 'pA', + user: dbUser, + text: 'top', + createdAt: baseTime.add(const Duration(seconds: 2)), + quotedMessageId: 'pB', + ), + ]); + + final fetched = await pinnedMessageDao.getMessagesByCid(cid); + final top = fetched.firstWhere((m) => m.id == 'pA'); + expect(top.quotedMessage?.id, 'pB'); + expect(top.quotedMessage?.quotedMessage?.id, 'pC'); + }); + }); + tearDown(() async { await database.disconnect(); }); diff --git a/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart index 44abc4a64..80fe9a655 100644 --- a/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart @@ -81,20 +81,27 @@ void main() { test('getReactionsByUserId', () async { const messageId = 'testMessageId'; const userId = 'testUserId'; + const otherUserId = 'otherUserid'; // Should be empty initially final reactions = await pinnedMessageReactionDao.getReactionsByUserId(messageId, userId); expect(reactions, isEmpty); - // Adding sample reactions + // Adding sample reactions from the target user. final insertedReactions = await _prepareReactionData(messageId, userId: userId); expect(insertedReactions, isNotEmpty); - // Fetched reaction length should match inserted reactions length. + // Adding sample reactions from other users on the same message. + final otherInsertedReactions = + await _prepareReactionData(messageId, userId: otherUserId); + expect(otherInsertedReactions, isNotEmpty); + + // Fetched reaction length should match the target user's reactions only. // Every reaction messageId should match the provided messageId. - // Every reaction userId should match the provided userId. + // Every reaction userId should match the provided userId — i.e. reactions + // from other users on the same message must be filtered out. final fetchedReactions = await pinnedMessageReactionDao.getReactionsByUserId(messageId, userId); expect(fetchedReactions.length, insertedReactions.length); @@ -143,6 +150,19 @@ void main() { ); }); + test( + 'getReactions returns empty for a message id with no reactions, ' + 'even when reactions exist for other messages', () async { + const messageWithReactions = 'pmsg-A'; + const messageWithoutReactions = 'pmsg-B'; + + await _prepareReactionData(messageWithReactions); + + final fetched = + await pinnedMessageReactionDao.getReactions(messageWithoutReactions); + expect(fetched, isEmpty); + }); + group('deleteReactionsByMessageIds', () { const messageId1 = 'testMessageId1'; const messageId2 = 'testMessageId2'; diff --git a/packages/stream_chat_persistence/test/src/dao/poll_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/poll_dao_test.dart index 3f0b55167..37fe19f90 100644 --- a/packages/stream_chat_persistence/test/src/dao/poll_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/poll_dao_test.dart @@ -126,6 +126,82 @@ void main() { expect(fetchedPoll!.id, pollToFetch.id); }); + test('getPollById returns null for an unknown id', () async { + final poll = await pollDao.getPollById('does-not-exist'); + expect(poll, isNull); + }); + + test( + 'getPollById hydrates ownVotesAndAnswers + latestVotesByOption + ' + 'latestAnswers from mixed-user/mixed-option votes', () async { + const pollId = 'poll-mixed'; + const optionA = PollOption(id: 'opt-a', text: 'A'); + const optionB = PollOption(id: 'opt-b', text: 'B'); + + // The test database is built with `testUserId`, so a vote with that + // userId is "own". Other userIds end up only in the latest-by-option + // / latest-answers buckets. + final dbUser = User(id: 'testUserId'); + final otherUser = User(id: 'otherUser'); + await database.userDao.updateUsers([dbUser, otherUser]); + await pollDao.updatePolls([ + Poll( + id: pollId, + name: 'Pick one', + options: const [optionA, optionB], + createdBy: dbUser, + createdById: dbUser.id, + ), + ]); + await database.pollVoteDao.updatePollVotes([ + PollVote( + id: 'own-vote-a', + pollId: pollId, + userId: dbUser.id, + user: dbUser, + optionId: optionA.id, + createdAt: DateTime.now(), + ), + PollVote( + id: 'other-vote-b', + pollId: pollId, + userId: otherUser.id, + user: otherUser, + optionId: optionB.id, + createdAt: DateTime.now().add(const Duration(seconds: 1)), + ), + PollVote( + id: 'own-answer', + pollId: pollId, + userId: dbUser.id, + user: dbUser, + answerText: 'because', + createdAt: DateTime.now().add(const Duration(seconds: 2)), + ), + PollVote( + id: 'other-answer', + pollId: pollId, + userId: otherUser.id, + user: otherUser, + answerText: 'just because', + createdAt: DateTime.now().add(const Duration(seconds: 3)), + ), + ]); + + final fetched = await pollDao.getPollById(pollId); + + expect(fetched, isNotNull); + expect(fetched!.latestVotesByOption[optionA.id], hasLength(1)); + expect(fetched.latestVotesByOption[optionB.id], hasLength(1)); + expect(fetched.latestAnswers, hasLength(2)); + // 1 own vote + 1 own answer => 2. + expect(fetched.ownVotesAndAnswers, hasLength(2)); + expect( + fetched.ownVotesAndAnswers.every((v) => v.userId == dbUser.id), + isTrue, + ); + }); + test('deletePollsByIds', () async { // Preparing test data final insertedPolls = await _preparePollData(); @@ -164,6 +240,89 @@ void main() { expect(fetchedPollVotes, isEmpty); }); + group('getPollsByIds', () { + test('returns empty map for empty input ids', () async { + final result = await pollDao.getPollsByIds(const []); + expect(result, isEmpty); + }); + + test( + 'returns a poll per id with votes/answers/ownVotes grouped; ids not ' + 'in the cache map to null', () async { + const optionA = PollOption(id: 'opt-a', text: 'A'); + const optionB = PollOption(id: 'opt-b', text: 'B'); + + final dbUser = User(id: 'testUserId'); + final otherUser = User(id: 'otherUser'); + await database.userDao.updateUsers([dbUser, otherUser]); + await pollDao.updatePolls([ + Poll( + id: 'poll-1', + name: 'Pick one', + options: const [optionA, optionB], + createdBy: dbUser, + createdById: dbUser.id, + ), + Poll( + id: 'poll-2', + name: 'No votes', + options: const [optionA, optionB], + createdBy: dbUser, + createdById: dbUser.id, + ), + ]); + await database.pollVoteDao.updatePollVotes([ + PollVote( + id: 'p1-own-a', + pollId: 'poll-1', + userId: dbUser.id, + user: dbUser, + optionId: optionA.id, + createdAt: DateTime.now(), + ), + PollVote( + id: 'p1-other-b', + pollId: 'poll-1', + userId: otherUser.id, + user: otherUser, + optionId: optionB.id, + createdAt: DateTime.now(), + ), + PollVote( + id: 'p1-own-answer', + pollId: 'poll-1', + userId: dbUser.id, + user: dbUser, + answerText: 'because', + createdAt: DateTime.now(), + ), + ]); + + final result = await pollDao + .getPollsByIds(const ['poll-1', 'poll-2', 'poll-missing']); + + expect( + result.keys, unorderedEquals(['poll-1', 'poll-2', 'poll-missing'])); + expect(result['poll-missing'], isNull); + + final poll1 = result['poll-1']!; + expect(poll1.latestVotesByOption[optionA.id], hasLength(1)); + expect(poll1.latestVotesByOption[optionB.id], hasLength(1)); + expect(poll1.latestAnswers, hasLength(1)); + // 1 own vote + 1 own answer = 2. + expect(poll1.ownVotesAndAnswers, hasLength(2)); + expect( + poll1.ownVotesAndAnswers.every((v) => v.userId == dbUser.id), + isTrue, + ); + + final poll2 = result['poll-2']!; + expect(poll2.latestVotesByOption, isEmpty); + expect(poll2.latestAnswers, isEmpty); + expect(poll2.ownVotesAndAnswers, isEmpty); + }); + }); + tearDown(() async { await database.disconnect(); }); diff --git a/packages/stream_chat_persistence/test/src/dao/poll_vote_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/poll_vote_dao_test.dart index 093aac84b..75c8ac6f1 100644 --- a/packages/stream_chat_persistence/test/src/dao/poll_vote_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/poll_vote_dao_test.dart @@ -171,6 +171,49 @@ void main() { }); }); + group('getPollVotesForPolls', () { + test('returns empty map for empty input ids', () async { + final result = await pollVoteDao.getPollVotesForPolls(const []); + expect(result, isEmpty); + }); + + test( + 'returns votes grouped by poll id; polls with no votes (or unknown ' + 'ids) map to an empty list', () async { + const pollWithVotes = 'poll-with-votes'; + const pollWithoutVotes = 'poll-without-votes'; + const pollUnknown = 'poll-unknown'; + + // _preparePollVoteData seeds the poll plus 3 votes per option (9 + // total) and inserts the necessary users. + final seededVotes = await _preparePollVoteData(pollWithVotes); + // Seed a second poll with no votes — just to confirm cross-poll + // isolation (votes from pollWithVotes must not leak into its bucket). + await database.pollDao.updatePolls([ + Poll( + id: pollWithoutVotes, + name: 'No votes here', + options: const [PollOption(id: 'opt', text: 'X')], + createdById: 'user-0', + ), + ]); + + final result = await pollVoteDao.getPollVotesForPolls( + const [pollWithVotes, pollWithoutVotes, pollUnknown], + ); + + expect(result.keys, + unorderedEquals([pollWithVotes, pollWithoutVotes, pollUnknown])); + expect(result[pollWithVotes], hasLength(seededVotes.length)); + expect( + result[pollWithVotes]!.every((v) => v.pollId == pollWithVotes), + isTrue, + ); + expect(result[pollWithoutVotes], isEmpty); + expect(result[pollUnknown], isEmpty); + }); + }); + tearDown(() async { await database.disconnect(); }); diff --git a/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart index 7fa3569e4..7ec4a772c 100644 --- a/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart @@ -80,19 +80,26 @@ void main() { test('getReactionsByUserId', () async { const messageId = 'testMessageId'; const userId = 'testUserId'; + const otherUserId = 'otherUserid'; // Should be empty initially final reactions = await reactionDao.getReactionsByUserId(messageId, userId); expect(reactions, isEmpty); - // Adding sample reactions + // Adding sample reactions from the target user. final insertedReactions = await _prepareReactionData(messageId, userId: userId); expect(insertedReactions, isNotEmpty); - // Fetched reaction length should match inserted reactions length. + // Adding sample reactions from other users on the same message. + final otherInsertedReactions = + await _prepareReactionData(messageId, userId: otherUserId); + expect(otherInsertedReactions, isNotEmpty); + + // Fetched reaction length should match the target user's reactions only. // Every reaction messageId should match the provided messageId. - // Every reaction userId should match the provided userId. + // Every reaction userId should match the provided userId — i.e. reactions + // from other users on the same message must be filtered out. final fetchedReactions = await reactionDao.getReactionsByUserId(messageId, userId); expect(fetchedReactions.length, insertedReactions.length); @@ -140,6 +147,153 @@ void main() { ); }); + test('getReactionsForMessages chunks transparently when given >900 ids', + () async { + // 1,200 ids exceeds the historical SQLITE_MAX_VARIABLE_NUMBER cap (999) + // for a single `WHERE messageId IN (?, ?, ...)` statement — the helper + // must run the SELECT in chunks and merge. + const cid = 'test:ChunkedReactions'; + const total = 1200; + + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + await database.channelDao.updateChannels([ChannelModel(cid: cid)]); + + final baseTime = DateTime.now(); + final messages = List.generate( + total, + (i) => Message( + id: 'cmsg-$i', + user: dbUser, + text: 'Hello $i', + createdAt: baseTime.add(Duration(seconds: i)), + ), + ); + await database.messageDao.updateMessages(cid, messages); + + // Seed 1 reaction on every odd-indexed message (600 total) so we can + // verify both that the chunked query returns rows AND that ids without + // reactions still appear in the dense map with an empty list. + final reactions = [ + for (var i = 0; i < total; i++) + if (i.isOdd) + Reaction( + type: 'like', + messageId: messages[i].id, + user: dbUser, + createdAt: baseTime.add(Duration(seconds: i)), + ), + ]; + await reactionDao.updateReactions(reactions); + + final ids = messages.map((m) => m.id).toList(); + final grouped = await reactionDao.getReactionsForMessages(ids); + + expect(grouped, hasLength(total), + reason: 'every input id must be a key (dense-map contract)'); + var withReactions = 0; + var empty = 0; + for (var i = 0; i < total; i++) { + final list = grouped['cmsg-$i']; + expect(list, isNotNull); + if (i.isOdd) { + expect(list, hasLength(1)); + expect(list!.first.messageId, 'cmsg-$i'); + withReactions++; + } else { + expect(list, isEmpty); + empty++; + } + } + expect(withReactions, total ~/ 2); + expect(empty, total ~/ 2); + }); + + test( + 'getReactions returns empty for a message id with no reactions, ' + 'even when reactions exist for other messages', () async { + // Locks per-id isolation: the upcoming batched `WHERE messageId IN (...)` + // path must not leak rows across ids when only one is queried. + const messageWithReactions = 'msg-A'; + const messageWithoutReactions = 'msg-B'; + + await _prepareReactionData(messageWithReactions); + + final fetched = await reactionDao.getReactions(messageWithoutReactions); + expect(fetched, isEmpty); + }); + + group('getReactionsForMessagesByUserId', () { + test('returns empty map for empty input ids', () async { + final result = await reactionDao + .getReactionsForMessagesByUserId(const [], 'someUser'); + expect(result, isEmpty); + }); + + test( + "returns the given user's reactions per message id; message ids " + 'with no reactions from that user map to an empty list', () async { + const cid = 'test:Cid'; + const targetUser = 'targetUser'; + const otherUser = 'otherUser'; + const msgWithOwn = 'msg-with-own'; + const msgWithoutOwn = 'msg-without-own'; + const msgUnknown = 'msg-unknown'; + + final users = [User(id: targetUser), User(id: otherUser)]; + await database.userDao.updateUsers(users); + await database.channelDao.updateChannels([ChannelModel(cid: cid)]); + await database.messageDao.updateMessages(cid, [ + Message( + id: msgWithOwn, + user: users.first, + createdAt: DateTime.now(), + text: 'a', + ), + Message( + id: msgWithoutOwn, + user: users.first, + createdAt: DateTime.now(), + text: 'b', + ), + ]); + // msgWithOwn: 1 own + 1 other; msgWithoutOwn: only other-user reaction. + await reactionDao.updateReactions([ + Reaction( + type: 'like', + messageId: msgWithOwn, + userId: targetUser, + createdAt: DateTime.now(), + ), + Reaction( + type: 'wow', + messageId: msgWithOwn, + userId: otherUser, + createdAt: DateTime.now(), + ), + Reaction( + type: 'love', + messageId: msgWithoutOwn, + userId: otherUser, + createdAt: DateTime.now(), + ), + ]); + + final result = await reactionDao.getReactionsForMessagesByUserId( + const [msgWithOwn, msgWithoutOwn, msgUnknown], + targetUser, + ); + + expect(result.keys, + unorderedEquals([msgWithOwn, msgWithoutOwn, msgUnknown])); + expect(result[msgWithOwn], hasLength(1)); + expect(result[msgWithOwn]!.single.userId, targetUser); + expect(result[msgWithOwn]!.single.type, 'like'); + expect(result[msgWithoutOwn], isEmpty); + expect(result[msgUnknown], isEmpty); + }); + }); + group('deleteReactionsByMessageIds', () { const messageId1 = 'testMessageId1'; const messageId2 = 'testMessageId2';