Skip to content

Commit 3b385f3

Browse files
committed
perf(llc): Move pinned messages pagination to SQL
1 parent c04a7cf commit 3b385f3

3 files changed

Lines changed: 104 additions & 31 deletions

File tree

packages/stream_chat_persistence/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
- Reduce the number of DB reads in the `ChatPersistenceClient.getChannelStates` method.
66
- Read only the messages matching the `PaginationParams` from DB when calling `MessageDao.getMessagesByCid` instead of reading all messages for the channel and applying pagination in memory.
7+
- Read only the messages matching the `PaginationParams` from DB when calling `PinnedMessageDao.getMessagesByCid` instead of reading all pinned messages for the channel and applying pagination in memory.
78
- Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory.
89
- Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory.
910
- Improve the message read times from DB.
@@ -13,6 +14,8 @@
1314
- `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`.
1415
- `MessageDao.getMessagesByCid` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour.
1516
- `MessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail — mirroring how `lessThan` already returned the messages immediately before the pivot.
17+
- `PinnedMessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`.
18+
- `PinnedMessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail.
1619

1720

1821
## 9.24.0

packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart

Lines changed: 90 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,38 @@ class PinnedMessageDao extends DatabaseAccessor<DriftChatDatabase>
242242
bool fetchDraft = true,
243243
PaginationParams? messagePagination,
244244
}) async {
245+
final (
246+
lessThanCursor,
247+
lessThanOrEqualCursor,
248+
greaterThanCursor,
249+
greaterThanOrEqualCursor,
250+
) = await (
251+
_lookupCursor(messagePagination?.lessThan),
252+
_lookupCursor(messagePagination?.lessThanOrEqual),
253+
_lookupCursor(messagePagination?.greaterThan),
254+
_lookupCursor(messagePagination?.greaterThanOrEqual),
255+
).wait;
256+
257+
// When the caller is paginating forward (greaterThan / greaterThanOrEqual
258+
// only), order ASC so the SQL `LIMIT` retains the N messages immediately
259+
// AFTER the cursor. Otherwise order DESC so `LIMIT` retains the N most
260+
// recent (closest to a `lessThan` cursor, or the channel tail when no
261+
// cursor is set). The final result is always reshaped to ASC for display.
262+
final isForwardPagination =
263+
(greaterThanCursor != null || greaterThanOrEqualCursor != null) &&
264+
lessThanCursor == null &&
265+
lessThanOrEqualCursor == null;
266+
267+
final orderBy = isForwardPagination
268+
? [
269+
OrderingTerm.asc(pinnedMessages.createdAt),
270+
OrderingTerm.asc(pinnedMessages.id),
271+
]
272+
: [
273+
OrderingTerm.desc(pinnedMessages.createdAt),
274+
OrderingTerm.desc(pinnedMessages.id),
275+
];
276+
245277
final query = select(pinnedMessages).join([
246278
leftOuterJoin(_users, pinnedMessages.userId.equalsExp(_users.id)),
247279
leftOuterJoin(
@@ -252,35 +284,48 @@ class PinnedMessageDao extends DatabaseAccessor<DriftChatDatabase>
252284
..where(pinnedMessages.channelCid.equals(cid))
253285
..where(pinnedMessages.parentId.isNull() |
254286
pinnedMessages.showInChannel.equals(true))
255-
..orderBy([OrderingTerm.asc(pinnedMessages.createdAt)]);
287+
..orderBy(orderBy);
256288

257-
final rows = await query.get();
258-
final msgList = await _messagesFromJoinRows(rows, fetchDraft: fetchDraft);
289+
// Cursor predicates compare the full `(createdAt, id)` tuple — the same
290+
// key used in ORDER BY — so messages sharing a `createdAt` with the cursor
291+
// fall on the correct side of the boundary. Filtering on `createdAt` alone
292+
// would skip or repeat those siblings across pages.
293+
if (lessThanCursor case final c?) {
294+
query.where(
295+
pinnedMessages.createdAt.isSmallerThanValue(c.createdAt) |
296+
(pinnedMessages.createdAt.equals(c.createdAt) &
297+
pinnedMessages.id.isSmallerThanValue(c.id)),
298+
);
299+
}
300+
if (lessThanOrEqualCursor case final c?) {
301+
query.where(
302+
pinnedMessages.createdAt.isSmallerThanValue(c.createdAt) |
303+
(pinnedMessages.createdAt.equals(c.createdAt) &
304+
pinnedMessages.id.isSmallerOrEqualValue(c.id)),
305+
);
306+
}
307+
if (greaterThanCursor case final c?) {
308+
query.where(
309+
pinnedMessages.createdAt.isBiggerThanValue(c.createdAt) |
310+
(pinnedMessages.createdAt.equals(c.createdAt) &
311+
pinnedMessages.id.isBiggerThanValue(c.id)),
312+
);
313+
}
314+
if (greaterThanOrEqualCursor case final c?) {
315+
query.where(
316+
pinnedMessages.createdAt.isBiggerThanValue(c.createdAt) |
317+
(pinnedMessages.createdAt.equals(c.createdAt) &
318+
pinnedMessages.id.isBiggerOrEqualValue(c.id)),
319+
);
320+
}
259321

260-
if (msgList.isNotEmpty) {
261-
final mutable = msgList.toList();
262-
if (messagePagination?.lessThan != null) {
263-
final lessThanIndex = mutable.indexWhere(
264-
(m) => m.id == messagePagination!.lessThan,
265-
);
266-
if (lessThanIndex != -1) {
267-
mutable.removeRange(lessThanIndex, mutable.length);
268-
}
269-
}
270-
if (messagePagination?.greaterThan != null) {
271-
final greaterThanIndex = mutable.indexWhere(
272-
(m) => m.id == messagePagination!.greaterThan,
273-
);
274-
if (greaterThanIndex != -1) {
275-
mutable.removeRange(0, greaterThanIndex);
276-
}
277-
}
278-
if (messagePagination?.limit != null) {
279-
return mutable.take(messagePagination!.limit).toList();
280-
}
281-
return mutable;
322+
if (messagePagination != null) {
323+
query.limit(messagePagination.limit);
282324
}
283-
return msgList;
325+
326+
final rows = await query.get();
327+
final orderedRows = isForwardPagination ? rows : rows.reversed.toList();
328+
return _messagesFromJoinRows(orderedRows, fetchDraft: fetchDraft);
284329
}
285330

286331
/// Updates the message data of a particular channel with
@@ -304,4 +349,23 @@ class PinnedMessageDao extends DatabaseAccessor<DriftChatDatabase>
304349
(batch) => batch.insertAllOnConflictUpdate(pinnedMessages, entities),
305350
);
306351
}
352+
353+
/// Returns the `(createdAt, id)` cursor for the pinned message with [id] in
354+
/// the local cache, or `null` if [id] is null, the message isn't cached, or
355+
/// isn't visible in the channel (i.e. a thread reply with
356+
/// `showInChannel = false`).
357+
Future<({DateTime createdAt, String id})?> _lookupCursor(String? id) async {
358+
if (id == null) return null;
359+
final createdAt = await (selectOnly(pinnedMessages)
360+
..addColumns([pinnedMessages.createdAt])
361+
..where(pinnedMessages.id.equals(id))
362+
..where(
363+
pinnedMessages.parentId.isNull() |
364+
pinnedMessages.showInChannel.equals(true),
365+
))
366+
.map((row) => row.read(pinnedMessages.createdAt))
367+
.getSingleOrNull();
368+
if (createdAt == null) return null;
369+
return (createdAt: createdAt, id: id);
370+
}
307371
}

packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,20 @@ void main() {
2525
}) async {
2626
final channels = [ChannelModel(cid: cid)];
2727
final users = List.generate(count, (index) => User(id: 'testUserId$index'));
28+
// Strictly monotonic `createdAt` per message so SQL-side pagination
29+
// filters (`WHERE createdAt < cutoff`, `ORDER BY createdAt ASC`) can't be
30+
// confused by ties. Drift stores `DateTime` as integer Unix seconds by
31+
// default, so the offset must be at least 1 second per row — otherwise
32+
// sub-second offsets all round-trip onto the same second.
33+
final baseTime = DateTime.now();
2834
final messages = List.generate(
2935
count,
3036
(index) => Message(
3137
id: 'testMessageId$cid$index',
3238
type: 'testType',
3339
user: users[index],
3440
channelRole: 'channel_member',
35-
createdAt: DateTime.now(),
41+
createdAt: baseTime.add(Duration(seconds: index)),
3642
shadowed: math.Random().nextBool(),
3743
replyCount: index,
3844
updatedAt: DateTime.now(),
@@ -54,7 +60,7 @@ void main() {
5460
type: 'testType',
5561
user: users[index],
5662
channelRole: 'channel_member',
57-
createdAt: DateTime.now(),
63+
createdAt: baseTime.add(Duration(seconds: index)),
5864
shadowed: math.Random().nextBool(),
5965
replyCount: index,
6066
updatedAt: DateTime.now(),
@@ -75,7 +81,7 @@ void main() {
7581
channelRole: 'channel_member',
7682
parentId:
7783
mapAllThreadToFirstMessage ? messages[0].id : messages[index].id,
78-
createdAt: DateTime.now(),
84+
createdAt: baseTime.add(Duration(seconds: index)),
7985
shadowed: math.Random().nextBool(),
8086
replyCount: index,
8187
updatedAt: DateTime.now(),
@@ -380,8 +386,8 @@ void main() {
380386
messagePagination: pagination,
381387
);
382388
expect(fetchedMessages.length, limit);
383-
expect(fetchedMessages.first.id, greaterThan);
384-
expect(fetchedMessages.last.id != lessThan, true);
389+
expect(fetchedMessages.first.id, 'testMessageId${cid}10');
390+
expect(fetchedMessages.last.id, 'testMessageId${cid}24');
385391
});
386392

387393
test('updateMessages', () async {

0 commit comments

Comments
 (0)