Skip to content

Commit 9e0808d

Browse files
authored
Merge pull request #432 from Mng-dev-ai/fix/cross-chat-stream-cache-writes
Fix off-screen stream cache writes targeting wrong chat
2 parents b6603bb + bf2f5b9 commit 9e0808d

1 file changed

Lines changed: 64 additions & 27 deletions

File tree

frontend/src/hooks/useStreamCallbacks.ts

Lines changed: 64 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,31 @@ import type { PaginatedMessages } from '@/types/api.types';
2727
// and the query cache. 130ms collects a visible chunk of text per paint cycle.
2828
const STREAM_FLUSH_INTERVAL_MS = 130;
2929

30+
// Cross-chat cache mutators: unlike the hook-scoped useMessageCache (which
31+
// closes over the currently viewed chatId), these target a specific chat by
32+
// explicit parameter — needed when off-screen streams flush or finalize into
33+
// a chat the user has navigated away from.
34+
function updateMessageInCacheForChat(
35+
queryClient: QueryClient,
36+
chatId: string,
37+
messageId: string,
38+
updater: (msg: Message) => Message,
39+
) {
40+
queryClient.setQueryData(
41+
queryKeys.messages(chatId),
42+
(oldData: { pages: PaginatedMessages[]; pageParams: unknown[] } | undefined) => {
43+
if (!oldData?.pages) return oldData;
44+
return {
45+
...oldData,
46+
pages: oldData.pages.map((page: PaginatedMessages) => ({
47+
...page,
48+
items: page.items.map((msg: Message) => (msg.id === messageId ? updater(msg) : msg)),
49+
})),
50+
};
51+
},
52+
);
53+
}
54+
3055
function findMessageInCache(
3156
queryClient: QueryClient,
3257
chatId: string,
@@ -244,10 +269,10 @@ export function useStreamCallbacks({
244269
}
245270

246271
if (writeToCache) {
247-
updateMessageInCache(session.messageId, update);
272+
updateMessageInCacheForChat(queryClient, session.chatId, session.messageId, update);
248273
}
249274
},
250-
[setMessages, updateMessageInCache],
275+
[setMessages, queryClient],
251276
);
252277

253278
// Coalescing timer: only one pending flush per stream. Multiple envelopes arriving
@@ -330,30 +355,13 @@ export function useStreamCallbacks({
330355

331356
// Flush any pending content to the query cache before clearing,
332357
// so content already cursor-acked in chatStorage is not lost on unmount.
333-
// Write directly via queryClient using session.chatId — the hook-level
334-
// updateMessageInCache is scoped to the current chatId, which may differ
335-
// from the session's owning chat after cross-chat navigation.
336358
for (const [streamId, timer] of flushTimers.entries()) {
337359
clearTimeout(timer);
338360
const buffer = buffers.get(streamId);
339361
const session = streamSessions.get(streamId);
340362
if (buffer && session) {
341363
const update = buildContentFlushUpdate(streamId, buffer, session);
342-
queryClient.setQueryData(
343-
queryKeys.messages(session.chatId),
344-
(oldData: { pages: PaginatedMessages[]; pageParams: unknown[] } | undefined) => {
345-
if (!oldData?.pages) return oldData;
346-
return {
347-
...oldData,
348-
pages: oldData.pages.map((page: PaginatedMessages) => ({
349-
...page,
350-
items: page.items.map((msg: Message) =>
351-
msg.id === session.messageId ? update(msg) : msg,
352-
),
353-
})),
354-
};
355-
},
356-
);
364+
updateMessageInCacheForChat(queryClient, session.chatId, session.messageId, update);
357365
}
358366
}
359367
flushTimers.clear();
@@ -512,6 +520,10 @@ export function useStreamCallbacks({
512520
const resolvedStreamId = streamId ?? findStreamIdByMessage(messageId);
513521
const isCancelled = terminalKind === 'cancelled';
514522
const isCurrentChat = chatId === chatIdRef.current;
523+
// Capture before clearStreamSession deletes it
524+
const sessionChatId = resolvedStreamId
525+
? streamSessionsRef.current.get(resolvedStreamId)?.chatId
526+
: undefined;
515527

516528
if (resolvedStreamId) {
517529
flushBufferedContent(resolvedStreamId, { writeToCache: true });
@@ -528,7 +540,10 @@ export function useStreamCallbacks({
528540
active_stream_id: null,
529541
stream_status: isCancelled ? 'interrupted' : 'completed',
530542
});
531-
updateMessageInCache(messageId, finalizeMessage);
543+
const targetChatId = sessionChatId ?? chatId;
544+
if (targetChatId) {
545+
updateMessageInCacheForChat(queryClient, targetChatId, messageId, finalizeMessage);
546+
}
532547
if (isCurrentChat) {
533548
setMessages((prev) =>
534549
prev.map((msg) => (msg.id === messageId ? finalizeMessage(msg) : msg)),
@@ -593,14 +608,16 @@ export function useStreamCallbacks({
593608
setPendingUserMessageId,
594609
setStreamState,
595610
settings?.notifications_enabled,
596-
updateMessageInCache,
597611
],
598612
);
599613

600614
const onError = useCallback(
601615
(streamError: Error, assistantMessageId?: string, streamId?: string) => {
602616
const resolvedStreamId = streamId ?? findStreamIdByMessage(assistantMessageId);
603617
const isCurrentChat = chatId === chatIdRef.current;
618+
const sessionChatId = resolvedStreamId
619+
? streamSessionsRef.current.get(resolvedStreamId)?.chatId
620+
: undefined;
604621

605622
if (resolvedStreamId) {
606623
flushBufferedContent(resolvedStreamId, { writeToCache: true });
@@ -616,7 +633,10 @@ export function useStreamCallbacks({
616633
active_stream_id: null,
617634
stream_status: 'failed',
618635
});
619-
updateMessageInCache(assistantMessageId, markFailed);
636+
const targetChatId = sessionChatId ?? chatId;
637+
if (targetChatId) {
638+
updateMessageInCacheForChat(queryClient, targetChatId, assistantMessageId, markFailed);
639+
}
620640
if (isCurrentChat) {
621641
setMessages((prev) =>
622642
prev.map((msg) => (msg.id === assistantMessageId ? markFailed(msg) : msg)),
@@ -635,7 +655,7 @@ export function useStreamCallbacks({
635655
flushBufferedContent,
636656
chatId,
637657
clearStreamSession,
638-
updateMessageInCache,
658+
queryClient,
639659
findStreamIdByMessage,
640660
setCurrentMessageId,
641661
setError,
@@ -696,8 +716,25 @@ export function useStreamCallbacks({
696716

697717
// Cache updates must run even for off-screen chats so returning
698718
// within the staleTime window shows the queued continuation messages.
699-
addMessageToCache(userMessage);
700-
addMessageToCache(assistantMessage);
719+
// Batch both messages into a single setQueryData call to avoid double
720+
// cache churn and subscriber notifications.
721+
queryClient.setQueryData(
722+
queryKeys.messages(chatId),
723+
(oldData: { pages: PaginatedMessages[]; pageParams: unknown[] } | undefined) => {
724+
if (!oldData?.pages || oldData.pages.length === 0) return oldData;
725+
const items = [...oldData.pages[0].items];
726+
if (!items.some((msg) => msg.id === userMessage.id)) {
727+
items.unshift(userMessage);
728+
}
729+
if (!items.some((msg) => msg.id === assistantMessage.id)) {
730+
items.unshift(assistantMessage);
731+
}
732+
return {
733+
...oldData,
734+
pages: oldData.pages.map((page, idx) => (idx === 0 ? { ...page, items } : page)),
735+
};
736+
},
737+
);
701738

702739
if (!isCurrentChat) return;
703740

@@ -708,8 +745,8 @@ export function useStreamCallbacks({
708745
flushBufferedContent,
709746
chatId,
710747
clearStreamSession,
748+
queryClient,
711749
setMessages,
712-
addMessageToCache,
713750
setCurrentMessageId,
714751
],
715752
);

0 commit comments

Comments
 (0)