Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion packages/stream_chat_persistence/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
# Upcoming
## Upcoming Changes

🚀 Performance

- Reduce the number of DB reads in the `ChatPersistenceClient.getChannelStates` method.
- Read only the messages matching the `PaginationParams` from DB when calling `MessageDao.getMessagesByCid` instead of reading all messages for the channel and applying pagination in memory.
- Read only the thread replies matching the `PaginationParams` from DB when calling `MessageDao.getThreadMessagesByParentId` instead of reading all replies for the thread and applying pagination in memory.
- Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory.
- Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory.

🐞 Fixed

- `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`.
- `MessageDao.getMessagesByCid` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour.
- `MessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail — mirroring how `lessThan` already returned the messages immediately before the pivot.
- `MessageDao.getThreadMessagesByParentId` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor reply), in addition to the existing `lessThan`/`greaterThan`.
- `MessageDao.getThreadMessagesByParentId` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour.
- `MessageDao.getThreadMessagesByParentId` with a `limit` now returns the page of replies closest to the pivot (immediately before a `lessThan`/`lessThanOrEqual` cursor, immediately after a `greaterThan`/`greaterThanOrEqual` cursor, or the thread tail when no cursor is set), instead of always returning the oldest replies of the thread.


## 9.24.0

Expand Down
253 changes: 187 additions & 66 deletions packages/stream_chat_persistence/lib/src/dao/message_dao.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import 'dart:math';

import 'package:drift/drift.dart';
import 'package:stream_chat/stream_chat.dart';
import 'package:stream_chat_persistence/src/db/drift_chat_database.dart';
Expand Down Expand Up @@ -126,42 +124,89 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
String parentId, {
PaginationParams? options,
}) async {
final msgList = await Future.wait(await (select(messages).join([
final (
lessThanCursor,
lessThanOrEqualCursor,
greaterThanCursor,
greaterThanOrEqualCursor,
) = await (
_lookupThreadCursor(parentId, options?.lessThan),
_lookupThreadCursor(parentId, options?.lessThanOrEqual),
_lookupThreadCursor(parentId, options?.greaterThan),
_lookupThreadCursor(parentId, options?.greaterThanOrEqual),
).wait;

// When the caller is paginating forward (greaterThan / greaterThanOrEqual
// only), order ASC so the SQL `LIMIT` retains the N replies immediately
// AFTER the cursor. Otherwise order DESC so `LIMIT` retains the N replies
// closest to a `lessThan` cursor (or the thread's tail when no cursor is
// set). The final result is always reshaped to ASC for display.
final isForwardPagination =
(greaterThanCursor != null || greaterThanOrEqualCursor != null) &&
lessThanCursor == null &&
lessThanOrEqualCursor == null;

final orderBy = isForwardPagination
? [
OrderingTerm.asc(messages.createdAt),
OrderingTerm.asc(messages.id),
]
: [
OrderingTerm.desc(messages.createdAt),
OrderingTerm.desc(messages.id),
];

final query = select(messages).join([
leftOuterJoin(_users, messages.userId.equalsExp(_users.id)),
leftOuterJoin(
_pinnedByUsers,
messages.pinnedByUserId.equalsExp(_pinnedByUsers.id),
),
])
..where(messages.parentId.isNotNull())
..where(messages.parentId.equals(parentId))
..orderBy([OrderingTerm.asc(messages.createdAt)]))
.map(_messageFromJoinRow)
.get());

if (msgList.isNotEmpty) {
if (options?.lessThan != null) {
final lessThanIndex = msgList.indexWhere(
(m) => m.id == options!.lessThan,
);
if (lessThanIndex != -1) {
msgList.removeRange(lessThanIndex, msgList.length);
}
}
if (options?.greaterThan != null) {
final greaterThanIndex = msgList.indexWhere(
(m) => m.id == options!.greaterThan,
);
if (greaterThanIndex != -1) {
msgList.removeRange(0, greaterThanIndex);
}
}
final limit = options?.limit;
if (limit != null && limit > 0) {
return msgList.take(limit).toList();
}
..where(messages.parentId.equals(parentId))
..orderBy(orderBy);

// Cursor predicates compare the full `(createdAt, id)` tuple — the same
// key used in ORDER BY — so replies sharing a `createdAt` with the cursor
// fall on the correct side of the boundary. Filtering on `createdAt`
// alone would skip or repeat those siblings across pages.
if (lessThanCursor case final c?) {
query.where(
messages.createdAt.isSmallerThanValue(c.createdAt) |
(messages.createdAt.equals(c.createdAt) &
messages.id.isSmallerThanValue(c.id)),
);
}
if (lessThanOrEqualCursor case final c?) {
query.where(
messages.createdAt.isSmallerThanValue(c.createdAt) |
(messages.createdAt.equals(c.createdAt) &
messages.id.isSmallerOrEqualValue(c.id)),
);
}
if (greaterThanCursor case final c?) {
query.where(
messages.createdAt.isBiggerThanValue(c.createdAt) |
(messages.createdAt.equals(c.createdAt) &
messages.id.isBiggerThanValue(c.id)),
);
}
return msgList;
if (greaterThanOrEqualCursor case final c?) {
query.where(
messages.createdAt.isBiggerThanValue(c.createdAt) |
(messages.createdAt.equals(c.createdAt) &
messages.id.isBiggerOrEqualValue(c.id)),
);
}

if (options != null) {
query.limit(options.limit);
}

final rows = await query.get();
final orderedRows = isForwardPagination ? rows : rows.reversed.toList();

return Future.wait(orderedRows.map(_messageFromJoinRow));
}

/// Returns all the messages of a channel by matching
Expand All @@ -171,6 +216,38 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
bool fetchDraft = true,
PaginationParams? messagePagination,
}) async {
final (
lessThanCursor,
lessThanOrEqualCursor,
greaterThanCursor,
greaterThanOrEqualCursor,
) = await (
_lookupCursor(messagePagination?.lessThan),
_lookupCursor(messagePagination?.lessThanOrEqual),
_lookupCursor(messagePagination?.greaterThan),
_lookupCursor(messagePagination?.greaterThanOrEqual),
).wait;

// When the caller is paginating forward (greaterThan / greaterThanOrEqual
// only), order ASC so the SQL `LIMIT` retains the N messages immediately
// AFTER the cursor. Otherwise order DESC so `LIMIT` retains the N most
// recent (closest to a `lessThan` cursor, or the channel tail when no
// cursor is set). The final result is always reshaped to ASC for display.
final isForwardPagination =
(greaterThanCursor != null || greaterThanOrEqualCursor != null) &&
lessThanCursor == null &&
lessThanOrEqualCursor == null;

final orderBy = isForwardPagination
? [
OrderingTerm.asc(messages.createdAt),
OrderingTerm.asc(messages.id),
]
: [
OrderingTerm.desc(messages.createdAt),
OrderingTerm.desc(messages.id),
];

final query = select(messages).join([
leftOuterJoin(_users, messages.userId.equalsExp(_users.id)),
leftOuterJoin(
Expand All @@ -180,44 +257,52 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
])
..where(messages.channelCid.equals(cid))
..where(messages.parentId.isNull() | messages.showInChannel.equals(true))
..orderBy([OrderingTerm.asc(messages.createdAt)]);
..orderBy(orderBy);

final result = await query.get();
if (result.isEmpty) return [];

final msgList = await Future.wait(
result.map(
(row) => _messageFromJoinRow(
row,
fetchDraft: fetchDraft,
),
),
);
// Cursor predicates compare the full `(createdAt, id)` tuple — the same
// key used in ORDER BY — so messages sharing a `createdAt` with the cursor
// fall on the correct side of the boundary. Filtering on `createdAt` alone
// would skip or repeat those siblings across pages.
if (lessThanCursor case final c?) {
query.where(
messages.createdAt.isSmallerThanValue(c.createdAt) |
(messages.createdAt.equals(c.createdAt) &
messages.id.isSmallerThanValue(c.id)),
);
}
if (lessThanOrEqualCursor case final c?) {
query.where(
messages.createdAt.isSmallerThanValue(c.createdAt) |
(messages.createdAt.equals(c.createdAt) &
messages.id.isSmallerOrEqualValue(c.id)),
);
}
if (greaterThanCursor case final c?) {
query.where(
messages.createdAt.isBiggerThanValue(c.createdAt) |
(messages.createdAt.equals(c.createdAt) &
messages.id.isBiggerThanValue(c.id)),
);
}
if (greaterThanOrEqualCursor case final c?) {
query.where(
messages.createdAt.isBiggerThanValue(c.createdAt) |
(messages.createdAt.equals(c.createdAt) &
messages.id.isBiggerOrEqualValue(c.id)),
);
}

if (msgList.isNotEmpty) {
if (messagePagination?.lessThan != null) {
final lessThanIndex = msgList.indexWhere(
(m) => m.id == messagePagination!.lessThan,
);
if (lessThanIndex != -1) {
msgList.removeRange(lessThanIndex, msgList.length);
}
}
if (messagePagination?.greaterThan != null) {
final greaterThanIndex = msgList.indexWhere(
(m) => m.id == messagePagination!.greaterThan,
);
if (greaterThanIndex != -1) {
msgList.removeRange(0, greaterThanIndex);
}
}
if (messagePagination?.limit != null) {
return msgList
.skip(max(0, msgList.length - messagePagination!.limit))
.toList();
}
if (messagePagination != null) {
query.limit(messagePagination.limit);
}
return msgList;

final rows = await query.get();
final orderedRows = isForwardPagination ? rows : rows.reversed.toList();

return Future.wait(
orderedRows
.map((row) => _messageFromJoinRow(row, fetchDraft: fetchDraft)),
);
}

/// Updates the message data of a particular channel with
Expand All @@ -241,4 +326,40 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
(batch) => batch.insertAllOnConflictUpdate(messages, entities),
);
}

/// Returns the `(createdAt, id)` cursor for the message with [id] in the
/// local cache, or `null` if [id] is null, the message isn't cached, or
/// isn't visible in the channel (i.e. a thread reply with
/// `showInChannel = false`).
Future<({DateTime createdAt, String id})?> _lookupCursor(String? id) async {
if (id == null) return null;
final createdAt = await (selectOnly(messages)
..addColumns([messages.createdAt])
..where(messages.id.equals(id))
..where(
messages.parentId.isNull() | messages.showInChannel.equals(true),
))
.map((row) => row.read(messages.createdAt))
.getSingleOrNull();
if (createdAt == null) return null;
return (createdAt: createdAt, id: id);
}

/// Returns the `(createdAt, id)` cursor for the thread reply with [id]
/// under [parentId] in the local cache, or `null` if [id] is null or no
/// such reply is cached.
Future<({DateTime createdAt, String id})?> _lookupThreadCursor(
String parentId,
String? id,
) async {
if (id == null) return null;
final createdAt = await (selectOnly(messages)
..addColumns([messages.createdAt])
..where(messages.id.equals(id))
..where(messages.parentId.equals(parentId)))
.map((row) => row.read(messages.createdAt))
.getSingleOrNull();
if (createdAt == null) return null;
return (createdAt: createdAt, id: id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor<DriftChatDatabase>

/// Returns all the reactions of a particular message by matching
/// [Reactions.messageId] with [messageId]
Future<List<Reaction>> getReactions(String messageId) =>
(select(pinnedMessageReactions).join([
leftOuterJoin(users, pinnedMessageReactions.userId.equalsExp(users.id)),
])
..where(pinnedMessageReactions.messageId.equals(messageId))
..orderBy([OrderingTerm.asc(pinnedMessageReactions.createdAt)]))
.map((rows) {
final userEntity = rows.readTableOrNull(users);
final reactionEntity = rows.readTable(pinnedMessageReactions);
return reactionEntity.toReaction(user: userEntity?.toUser());
}).get();
Future<List<Reaction>> getReactions(String messageId) {
final where = pinnedMessageReactions.messageId.equals(messageId);
return _selectReactions(where);
}

/// Returns all the reactions of a particular message
/// added by a particular user by matching
Expand All @@ -35,9 +28,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor<DriftChatDatabase>
Future<List<Reaction>> getReactionsByUserId(
String messageId,
String userId,
) async {
final reactions = await getReactions(messageId);
return reactions.where((it) => it.userId == userId).toList();
) {
final where = pinnedMessageReactions.messageId.equals(messageId) &
pinnedMessageReactions.userId.equals(userId);
return _selectReactions(where);
}

/// Updates the reactions data with the new [reactionList] data
Expand All @@ -57,4 +51,17 @@ class PinnedMessageReactionDao extends DatabaseAccessor<DriftChatDatabase>
(r) => r.messageId.isIn(messageIds),
);
});

Future<List<Reaction>> _selectReactions(Expression<bool> where) {
final rows = select(pinnedMessageReactions).join([
leftOuterJoin(users, pinnedMessageReactions.userId.equalsExp(users.id)),
])
..where(where)
..orderBy([OrderingTerm.asc(pinnedMessageReactions.createdAt)]);
return rows.map((row) {
final reactionEntity = row.readTable(pinnedMessageReactions);
final userEntity = row.readTableOrNull(users);
return reactionEntity.toReaction(user: userEntity?.toUser());
}).get();
}
}
Loading
Loading