Skip to content

Commit c04a7cf

Browse files
committed
Merge branch 'refs/heads/feature/FLU-485_optimize_read_message_from_db' into feature/FLU-485_optimize_read_message_from_db_part2
2 parents cc443dc + fa6898e commit c04a7cf

3 files changed

Lines changed: 124 additions & 40 deletions

File tree

packages/stream_chat_persistence/CHANGELOG.md

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,9 @@
88
- Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory.
99
- Improve the message read times from DB.
1010

11-
✅ Added
12-
13-
- `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`.
14-
1511
🐞 Fixed
1612

13+
- `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`.
1714
- `MessageDao.getMessagesByCid` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour.
1815
- `MessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail — mirroring how `lessThan` already returned the messages immediately before the pivot.
1916

packages/stream_chat_persistence/lib/src/dao/message_dao.dart

Lines changed: 48 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -249,27 +249,15 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
249249
PaginationParams? messagePagination,
250250
}) async {
251251
final (
252-
lessThanCutoff,
253-
lessThanOrEqualCutoff,
254-
greaterThanCutoff,
255-
greaterThanOrEqualCutoff,
252+
lessThanCursor,
253+
lessThanOrEqualCursor,
254+
greaterThanCursor,
255+
greaterThanOrEqualCursor,
256256
) = await (
257-
switch (messagePagination?.lessThan) {
258-
final id? => _lookupMessageCreatedAt(id),
259-
_ => Future<DateTime?>.value(),
260-
},
261-
switch (messagePagination?.lessThanOrEqual) {
262-
final id? => _lookupMessageCreatedAt(id),
263-
_ => Future<DateTime?>.value(),
264-
},
265-
switch (messagePagination?.greaterThan) {
266-
final id? => _lookupMessageCreatedAt(id),
267-
_ => Future<DateTime?>.value(),
268-
},
269-
switch (messagePagination?.greaterThanOrEqual) {
270-
final id? => _lookupMessageCreatedAt(id),
271-
_ => Future<DateTime?>.value(),
272-
},
257+
_lookupCursor(messagePagination?.lessThan),
258+
_lookupCursor(messagePagination?.lessThanOrEqual),
259+
_lookupCursor(messagePagination?.greaterThan),
260+
_lookupCursor(messagePagination?.greaterThanOrEqual),
273261
).wait;
274262

275263
// When the caller is paginating forward (greaterThan / greaterThanOrEqual
@@ -278,9 +266,9 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
278266
// recent (closest to a `lessThan` cursor, or the channel tail when no
279267
// cursor is set). The final result is always reshaped to ASC for display.
280268
final isForwardPagination =
281-
(greaterThanCutoff != null || greaterThanOrEqualCutoff != null) &&
282-
lessThanCutoff == null &&
283-
lessThanOrEqualCutoff == null;
269+
(greaterThanCursor != null || greaterThanOrEqualCursor != null) &&
270+
lessThanCursor == null &&
271+
lessThanOrEqualCursor == null;
284272

285273
final orderBy = isForwardPagination
286274
? [
@@ -303,17 +291,37 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
303291
..where(messages.parentId.isNull() | messages.showInChannel.equals(true))
304292
..orderBy(orderBy);
305293

306-
if (lessThanCutoff case final t?) {
307-
query.where(messages.createdAt.isSmallerThanValue(t));
294+
// Cursor predicates compare the full `(createdAt, id)` tuple — the same
295+
// key used in ORDER BY — so messages sharing a `createdAt` with the cursor
296+
// fall on the correct side of the boundary. Filtering on `createdAt` alone
297+
// would skip or repeat those siblings across pages.
298+
if (lessThanCursor case final c?) {
299+
query.where(
300+
messages.createdAt.isSmallerThanValue(c.createdAt) |
301+
(messages.createdAt.equals(c.createdAt) &
302+
messages.id.isSmallerThanValue(c.id)),
303+
);
308304
}
309-
if (lessThanOrEqualCutoff case final t?) {
310-
query.where(messages.createdAt.isSmallerOrEqualValue(t));
305+
if (lessThanOrEqualCursor case final c?) {
306+
query.where(
307+
messages.createdAt.isSmallerThanValue(c.createdAt) |
308+
(messages.createdAt.equals(c.createdAt) &
309+
messages.id.isSmallerOrEqualValue(c.id)),
310+
);
311311
}
312-
if (greaterThanCutoff case final t?) {
313-
query.where(messages.createdAt.isBiggerThanValue(t));
312+
if (greaterThanCursor case final c?) {
313+
query.where(
314+
messages.createdAt.isBiggerThanValue(c.createdAt) |
315+
(messages.createdAt.equals(c.createdAt) &
316+
messages.id.isBiggerThanValue(c.id)),
317+
);
314318
}
315-
if (greaterThanOrEqualCutoff case final t?) {
316-
query.where(messages.createdAt.isBiggerOrEqualValue(t));
319+
if (greaterThanOrEqualCursor case final c?) {
320+
query.where(
321+
messages.createdAt.isBiggerThanValue(c.createdAt) |
322+
(messages.createdAt.equals(c.createdAt) &
323+
messages.id.isBiggerOrEqualValue(c.id)),
324+
);
317325
}
318326

319327
if (messagePagination != null) {
@@ -347,17 +355,21 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
347355
);
348356
}
349357

350-
/// Returns the `createdAt` of the message with [id] in the local cache,
351-
/// or `null` if the message isn't cached or isn't visible in the channel
352-
/// (i.e. a thread reply with `showInChannel = false`).
353-
Future<DateTime?> _lookupMessageCreatedAt(String id) {
354-
return (selectOnly(messages)
358+
/// Returns the `(createdAt, id)` cursor for the message with [id] in the
359+
/// local cache, or `null` if [id] is null, the message isn't cached, or
360+
/// isn't visible in the channel (i.e. a thread reply with
361+
/// `showInChannel = false`).
362+
Future<({DateTime createdAt, String id})?> _lookupCursor(String? id) async {
363+
if (id == null) return null;
364+
final createdAt = await (selectOnly(messages)
355365
..addColumns([messages.createdAt])
356366
..where(messages.id.equals(id))
357367
..where(
358368
messages.parentId.isNull() | messages.showInChannel.equals(true),
359369
))
360370
.map((row) => row.read(messages.createdAt))
361371
.getSingleOrNull();
372+
if (createdAt == null) return null;
373+
return (createdAt: createdAt, id: id);
362374
}
363375
}

packages/stream_chat_persistence/test/src/dao/message_dao_test.dart

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,81 @@ void main() {
610610
expect(fetchedMessages.first.id, 'testMessageId${cid}5');
611611
expect(fetchedMessages.last.id, 'testMessageId${cid}14');
612612
});
613+
614+
test('cursor with tied createdAt does not skip or duplicate siblings',
615+
() async {
616+
// Three messages share an identical `createdAt`. The SQL ORDER BY uses
617+
// the `(createdAt, id)` tuple, so within the trio the relative order is
618+
// by id (lexicographic). A cursor at `msg_tieB` must split the trio
619+
// cleanly: `msg_tieA` lands on the "before" side, `msg_tieC` on the
620+
// "after" side. A `createdAt`-only WHERE predicate would collapse all
621+
// three into the cursor's bucket and drop or keep them together.
622+
final users = [User(id: 'tieUser')];
623+
await database.userDao.updateUsers(users);
624+
await database.channelDao.updateChannels([ChannelModel(cid: cid)]);
625+
626+
final tie = DateTime.now();
627+
final earlier = tie.subtract(const Duration(seconds: 1));
628+
final later = tie.add(const Duration(seconds: 1));
629+
630+
Message m(String id, DateTime t) => Message(
631+
id: id,
632+
user: users.first,
633+
createdAt: t,
634+
updatedAt: t,
635+
text: id,
636+
);
637+
638+
await messageDao.updateMessages(cid, [
639+
m('msg_pre', earlier),
640+
m('msg_tieA', tie),
641+
m('msg_tieB', tie),
642+
m('msg_tieC', tie),
643+
m('msg_post', later),
644+
]);
645+
646+
final before = await messageDao.getMessagesByCid(
647+
cid,
648+
messagePagination: const PaginationParams(
649+
limit: 100,
650+
lessThan: 'msg_tieB',
651+
),
652+
);
653+
expect(before.map((m) => m.id).toList(), ['msg_pre', 'msg_tieA']);
654+
655+
final after = await messageDao.getMessagesByCid(
656+
cid,
657+
messagePagination: const PaginationParams(
658+
limit: 100,
659+
greaterThan: 'msg_tieB',
660+
),
661+
);
662+
expect(after.map((m) => m.id).toList(), ['msg_tieC', 'msg_post']);
663+
664+
final atOrBefore = await messageDao.getMessagesByCid(
665+
cid,
666+
messagePagination: const PaginationParams(
667+
limit: 100,
668+
lessThanOrEqual: 'msg_tieB',
669+
),
670+
);
671+
expect(
672+
atOrBefore.map((m) => m.id).toList(),
673+
['msg_pre', 'msg_tieA', 'msg_tieB'],
674+
);
675+
676+
final atOrAfter = await messageDao.getMessagesByCid(
677+
cid,
678+
messagePagination: const PaginationParams(
679+
limit: 100,
680+
greaterThanOrEqual: 'msg_tieB',
681+
),
682+
);
683+
expect(
684+
atOrAfter.map((m) => m.id).toList(),
685+
['msg_tieB', 'msg_tieC', 'msg_post'],
686+
);
687+
});
613688
});
614689

615690
test('updateMessages', () async {

0 commit comments

Comments
 (0)