Skip to content

Commit 58cb7b1

Browse files
authored
perf(persistence): Reduce the number of read message per channel from DB when paginating (part 1) (#2679)
* refactor(dao): Filter reactions by userId at DB level. * refactor(dao): Filter reactions by userId at DB level (pinned messages). * refactor(dao): optimize message retrieval with SQL-side pagination filters * refactor(dao): fix formatting * refactor(dao): Update CHANGELOG.md * refactor(dao): Apply formatting * refactor(dao): enhance message pagination logic to support inclusive cursors * refactor(dao): Update docs * refactor(dao): Add message.id tiebreakier * refactor(dao): Add message.id tiebreaker * refactor(dao): Update CHANGELOG.md
1 parent b8d222f commit 58cb7b1

7 files changed

Lines changed: 466 additions & 79 deletions

File tree

packages/stream_chat_persistence/CHANGELOG.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,18 @@
1-
# Upcoming
1+
## Upcoming Changes
22

33
🚀 Performance
44

55
- Reduce the number of DB reads in the `ChatPersistenceClient.getChannelStates` method.
6+
- Read only the messages matching the `PaginationParams` from DB when calling `MessageDao.getMessagesByCid` instead of reading all messages for the channel and applying pagination in memory.
7+
- Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory.
8+
- Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory.
9+
10+
🐞 Fixed
11+
12+
- `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`.
13+
- `MessageDao.getMessagesByCid` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour.
14+
- `MessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail — mirroring how `lessThan` already returned the messages immediately before the pivot.
15+
616

717
🐛 Fixed
818

packages/stream_chat_persistence/lib/src/dao/message_dao.dart

Lines changed: 93 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import 'dart:math';
2-
31
import 'package:drift/drift.dart';
42
import 'package:stream_chat/stream_chat.dart';
53
import 'package:stream_chat_persistence/src/db/drift_chat_database.dart';
@@ -171,6 +169,38 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
171169
bool fetchDraft = true,
172170
PaginationParams? messagePagination,
173171
}) async {
172+
final (
173+
lessThanCursor,
174+
lessThanOrEqualCursor,
175+
greaterThanCursor,
176+
greaterThanOrEqualCursor,
177+
) = await (
178+
_lookupCursor(messagePagination?.lessThan),
179+
_lookupCursor(messagePagination?.lessThanOrEqual),
180+
_lookupCursor(messagePagination?.greaterThan),
181+
_lookupCursor(messagePagination?.greaterThanOrEqual),
182+
).wait;
183+
184+
// When the caller is paginating forward (greaterThan / greaterThanOrEqual
185+
// only), order ASC so the SQL `LIMIT` retains the N messages immediately
186+
// AFTER the cursor. Otherwise order DESC so `LIMIT` retains the N most
187+
// recent (closest to a `lessThan` cursor, or the channel tail when no
188+
// cursor is set). The final result is always reshaped to ASC for display.
189+
final isForwardPagination =
190+
(greaterThanCursor != null || greaterThanOrEqualCursor != null) &&
191+
lessThanCursor == null &&
192+
lessThanOrEqualCursor == null;
193+
194+
final orderBy = isForwardPagination
195+
? [
196+
OrderingTerm.asc(messages.createdAt),
197+
OrderingTerm.asc(messages.id),
198+
]
199+
: [
200+
OrderingTerm.desc(messages.createdAt),
201+
OrderingTerm.desc(messages.id),
202+
];
203+
174204
final query = select(messages).join([
175205
leftOuterJoin(_users, messages.userId.equalsExp(_users.id)),
176206
leftOuterJoin(
@@ -180,44 +210,52 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
180210
])
181211
..where(messages.channelCid.equals(cid))
182212
..where(messages.parentId.isNull() | messages.showInChannel.equals(true))
183-
..orderBy([OrderingTerm.asc(messages.createdAt)]);
213+
..orderBy(orderBy);
184214

185-
final result = await query.get();
186-
if (result.isEmpty) return [];
187-
188-
final msgList = await Future.wait(
189-
result.map(
190-
(row) => _messageFromJoinRow(
191-
row,
192-
fetchDraft: fetchDraft,
193-
),
194-
),
195-
);
215+
// Cursor predicates compare the full `(createdAt, id)` tuple — the same
216+
// key used in ORDER BY — so messages sharing a `createdAt` with the cursor
217+
// fall on the correct side of the boundary. Filtering on `createdAt` alone
218+
// would skip or repeat those siblings across pages.
219+
if (lessThanCursor case final c?) {
220+
query.where(
221+
messages.createdAt.isSmallerThanValue(c.createdAt) |
222+
(messages.createdAt.equals(c.createdAt) &
223+
messages.id.isSmallerThanValue(c.id)),
224+
);
225+
}
226+
if (lessThanOrEqualCursor case final c?) {
227+
query.where(
228+
messages.createdAt.isSmallerThanValue(c.createdAt) |
229+
(messages.createdAt.equals(c.createdAt) &
230+
messages.id.isSmallerOrEqualValue(c.id)),
231+
);
232+
}
233+
if (greaterThanCursor case final c?) {
234+
query.where(
235+
messages.createdAt.isBiggerThanValue(c.createdAt) |
236+
(messages.createdAt.equals(c.createdAt) &
237+
messages.id.isBiggerThanValue(c.id)),
238+
);
239+
}
240+
if (greaterThanOrEqualCursor case final c?) {
241+
query.where(
242+
messages.createdAt.isBiggerThanValue(c.createdAt) |
243+
(messages.createdAt.equals(c.createdAt) &
244+
messages.id.isBiggerOrEqualValue(c.id)),
245+
);
246+
}
196247

197-
if (msgList.isNotEmpty) {
198-
if (messagePagination?.lessThan != null) {
199-
final lessThanIndex = msgList.indexWhere(
200-
(m) => m.id == messagePagination!.lessThan,
201-
);
202-
if (lessThanIndex != -1) {
203-
msgList.removeRange(lessThanIndex, msgList.length);
204-
}
205-
}
206-
if (messagePagination?.greaterThan != null) {
207-
final greaterThanIndex = msgList.indexWhere(
208-
(m) => m.id == messagePagination!.greaterThan,
209-
);
210-
if (greaterThanIndex != -1) {
211-
msgList.removeRange(0, greaterThanIndex);
212-
}
213-
}
214-
if (messagePagination?.limit != null) {
215-
return msgList
216-
.skip(max(0, msgList.length - messagePagination!.limit))
217-
.toList();
218-
}
248+
if (messagePagination != null) {
249+
query.limit(messagePagination.limit);
219250
}
220-
return msgList;
251+
252+
final rows = await query.get();
253+
final orderedRows = isForwardPagination ? rows : rows.reversed.toList();
254+
255+
return Future.wait(
256+
orderedRows
257+
.map((row) => _messageFromJoinRow(row, fetchDraft: fetchDraft)),
258+
);
221259
}
222260

223261
/// Updates the message data of a particular channel with
@@ -241,4 +279,22 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
241279
(batch) => batch.insertAllOnConflictUpdate(messages, entities),
242280
);
243281
}
282+
283+
/// Returns the `(createdAt, id)` cursor for the message with [id] in the
284+
/// local cache, or `null` if [id] is null, the message isn't cached, or
285+
/// isn't visible in the channel (i.e. a thread reply with
286+
/// `showInChannel = false`).
287+
Future<({DateTime createdAt, String id})?> _lookupCursor(String? id) async {
288+
if (id == null) return null;
289+
final createdAt = await (selectOnly(messages)
290+
..addColumns([messages.createdAt])
291+
..where(messages.id.equals(id))
292+
..where(
293+
messages.parentId.isNull() | messages.showInChannel.equals(true),
294+
))
295+
.map((row) => row.read(messages.createdAt))
296+
.getSingleOrNull();
297+
if (createdAt == null) return null;
298+
return (createdAt: createdAt, id: id);
299+
}
244300
}

packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor<DriftChatDatabase>
1616

1717
/// Returns all the reactions of a particular message by matching
1818
/// [Reactions.messageId] with [messageId]
19-
Future<List<Reaction>> getReactions(String messageId) =>
20-
(select(pinnedMessageReactions).join([
21-
leftOuterJoin(users, pinnedMessageReactions.userId.equalsExp(users.id)),
22-
])
23-
..where(pinnedMessageReactions.messageId.equals(messageId))
24-
..orderBy([OrderingTerm.asc(pinnedMessageReactions.createdAt)]))
25-
.map((rows) {
26-
final userEntity = rows.readTableOrNull(users);
27-
final reactionEntity = rows.readTable(pinnedMessageReactions);
28-
return reactionEntity.toReaction(user: userEntity?.toUser());
29-
}).get();
19+
Future<List<Reaction>> getReactions(String messageId) {
20+
final where = pinnedMessageReactions.messageId.equals(messageId);
21+
return _selectReactions(where);
22+
}
3023

3124
/// Returns all the reactions of a particular message
3225
/// added by a particular user by matching
@@ -35,9 +28,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor<DriftChatDatabase>
3528
Future<List<Reaction>> getReactionsByUserId(
3629
String messageId,
3730
String userId,
38-
) async {
39-
final reactions = await getReactions(messageId);
40-
return reactions.where((it) => it.userId == userId).toList();
31+
) {
32+
final where = pinnedMessageReactions.messageId.equals(messageId) &
33+
pinnedMessageReactions.userId.equals(userId);
34+
return _selectReactions(where);
4135
}
4236

4337
/// Updates the reactions data with the new [reactionList] data
@@ -57,4 +51,17 @@ class PinnedMessageReactionDao extends DatabaseAccessor<DriftChatDatabase>
5751
(r) => r.messageId.isIn(messageIds),
5852
);
5953
});
54+
55+
Future<List<Reaction>> _selectReactions(Expression<bool> where) {
56+
final rows = select(pinnedMessageReactions).join([
57+
leftOuterJoin(users, pinnedMessageReactions.userId.equalsExp(users.id)),
58+
])
59+
..where(where)
60+
..orderBy([OrderingTerm.asc(pinnedMessageReactions.createdAt)]);
61+
return rows.map((row) {
62+
final reactionEntity = row.readTable(pinnedMessageReactions);
63+
final userEntity = row.readTableOrNull(users);
64+
return reactionEntity.toReaction(user: userEntity?.toUser());
65+
}).get();
66+
}
6067
}

packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,10 @@ class ReactionDao extends DatabaseAccessor<DriftChatDatabase>
1616

1717
/// Returns all the reactions of a particular message by matching
1818
/// [Reactions.messageId] with [messageId]
19-
Future<List<Reaction>> getReactions(String messageId) =>
20-
(select(reactions).join([
21-
leftOuterJoin(users, reactions.userId.equalsExp(users.id)),
22-
])
23-
..where(reactions.messageId.equals(messageId))
24-
..orderBy([OrderingTerm.asc(reactions.createdAt)]))
25-
.map((rows) {
26-
final userEntity = rows.readTableOrNull(users);
27-
final reactionEntity = rows.readTable(reactions);
28-
return reactionEntity.toReaction(user: userEntity?.toUser());
29-
}).get();
19+
Future<List<Reaction>> getReactions(String messageId) {
20+
final where = reactions.messageId.equals(messageId);
21+
return _selectReactions(where);
22+
}
3023

3124
/// Returns all the reactions of a particular message
3225
/// added by a particular user by matching
@@ -35,9 +28,10 @@ class ReactionDao extends DatabaseAccessor<DriftChatDatabase>
3528
Future<List<Reaction>> getReactionsByUserId(
3629
String messageId,
3730
String userId,
38-
) async {
39-
final reactions = await getReactions(messageId);
40-
return reactions.where((it) => it.userId == userId).toList();
31+
) {
32+
final where =
33+
reactions.messageId.equals(messageId) & reactions.userId.equals(userId);
34+
return _selectReactions(where);
4135
}
4236

4337
/// Updates the reactions data with the new [reactionList] data
@@ -57,4 +51,16 @@ class ReactionDao extends DatabaseAccessor<DriftChatDatabase>
5751
(r) => r.messageId.isIn(messageIds),
5852
);
5953
});
54+
55+
Future<List<Reaction>> _selectReactions(Expression<bool> where) {
56+
final rows = select(reactions)
57+
.join([leftOuterJoin(users, reactions.userId.equalsExp(users.id))])
58+
..where(where)
59+
..orderBy([OrderingTerm.asc(reactions.createdAt)]);
60+
return rows.map((row) {
61+
final reactionEntity = row.readTable(reactions);
62+
final userEntity = row.readTableOrNull(users);
63+
return reactionEntity.toReaction(user: userEntity?.toUser());
64+
}).get();
65+
}
6066
}

0 commit comments

Comments
 (0)