Skip to content

Commit 1ca4ea0

Browse files
oxoxDevclaude
andauthored
fix(chat): deduplicate assistant messages by subscribing to canonical events only (tinyhumansai#432) (tinyhumansai#439)
* fix(chat): deduplicate assistant messages by subscribing to canonical events only (tinyhumansai#432) The Rust core emits socket events with both snake_case and colon:case aliases via emit_with_aliases(). The frontend was subscribing to both, causing every chat event to fire twice and producing duplicate assistant messages. - Subscribe only to canonical snake_case events (tool_call, chat_segment, chat_done, chat_error) instead of both naming conventions - Add safety-net dedup layer in Conversations.tsx using a seen-events map with TTL to guard against any remaining edge cases - Add unit tests verifying only canonical events are processed Closes tinyhumansai#432 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * style: apply prettier formatting to chatService test Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0f57838 commit 1ca4ea0

3 files changed

Lines changed: 156 additions & 15 deletions

File tree

app/src/pages/Conversations.tsx

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,34 @@ const Conversations = () => {
229229
const autocompleteDebounceRef = useRef<number | null>(null);
230230
const autocompleteRequestSeqRef = useRef(0);
231231
const sendingTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
232+
const seenChatEventsRef = useRef<Map<string, number>>(new Map());
233+
234+
const markChatEventSeen = (key: string): boolean => {
235+
const now = Date.now();
236+
const cache = seenChatEventsRef.current;
237+
const ttlMs = 10 * 60_000;
238+
const maxEntries = 500;
239+
240+
if (cache.has(key)) return false;
241+
242+
cache.set(key, now);
243+
244+
// Prune old entries first.
245+
for (const [existingKey, timestamp] of cache) {
246+
if (now - timestamp > ttlMs) {
247+
cache.delete(existingKey);
248+
}
249+
}
250+
251+
// Keep bounded memory in long sessions.
252+
while (cache.size > maxEntries) {
253+
const oldest = cache.keys().next().value;
254+
if (!oldest) break;
255+
cache.delete(oldest);
256+
}
257+
258+
return true;
259+
};
232260

233261
const getAudioExtension = (mimeType: string): string => {
234262
const lower = mimeType.toLowerCase();
@@ -400,6 +428,9 @@ const Conversations = () => {
400428

401429
const cleanup = subscribeChatEvents({
402430
onToolCall: (event: ChatToolCallEvent) => {
431+
const eventKey = `tool_call:${event.thread_id}:${event.request_id ?? 'none'}:${event.round}:${event.tool_name}`;
432+
if (!markChatEventSeen(eventKey)) return;
433+
403434
setToolTimelineByThread(prev => {
404435
const existing = prev[event.thread_id] ?? [];
405436
return {
@@ -417,6 +448,9 @@ const Conversations = () => {
417448
});
418449
},
419450
onToolResult: (event: ChatToolResultEvent) => {
451+
const eventKey = `tool_result:${event.thread_id}:${event.request_id ?? 'none'}:${event.round}:${event.tool_name}:${event.success}`;
452+
if (!markChatEventSeen(eventKey)) return;
453+
420454
setToolTimelineByThread(prev => {
421455
const existing = prev[event.thread_id] ?? [];
422456
if (existing.length === 0) return prev;
@@ -441,6 +475,9 @@ const Conversations = () => {
441475
});
442476
},
443477
onSegment: (event: ChatSegmentEvent) => {
478+
const eventKey = `segment:${event.thread_id}:${event.request_id}:${event.segment_index}`;
479+
if (!markChatEventSeen(eventKey)) return;
480+
444481
// Rust delivers segments with delays already applied — just dispatch.
445482
if (event.reaction_emoji) {
446483
const pending = pendingReactionRef.current.get(event.thread_id);
@@ -458,6 +495,9 @@ const Conversations = () => {
458495
dispatch(addInferenceResponse({ content: segmentText(event), threadId: event.thread_id }));
459496
},
460497
onDone: event => {
498+
const eventKey = `done:${event.thread_id}:${event.request_id ?? 'none'}`;
499+
if (!markChatEventSeen(eventKey)) return;
500+
461501
// Update tool timeline
462502
setToolTimelineByThread(prev => {
463503
const existing = prev[event.thread_id] ?? [];
@@ -508,6 +548,9 @@ const Conversations = () => {
508548
dispatch(setActiveThread(null));
509549
},
510550
onError: event => {
551+
const eventKey = `error:${event.thread_id}:${event.request_id ?? 'none'}:${event.error_type}:${event.message}`;
552+
if (!markChatEventSeen(eventKey)) return;
553+
511554
if (event.thread_id !== selectedThreadIdRef.current) return;
512555
if (sendingTimeoutRef.current) {
513556
clearTimeout(sendingTimeoutRef.current);
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import { beforeEach, describe, expect, it, vi } from 'vitest';
2+
3+
import { subscribeChatEvents } from '../chatService';
4+
import { socketService } from '../socketService';
5+
6+
vi.mock('../socketService', () => ({ socketService: { getSocket: vi.fn() } }));
7+
8+
type Handler = (...args: unknown[]) => void;
9+
10+
function createMockSocket() {
11+
const handlers = new Map<string, Handler[]>();
12+
const on = vi.fn((event: string, cb: Handler) => {
13+
const existing = handlers.get(event) ?? [];
14+
existing.push(cb);
15+
handlers.set(event, existing);
16+
});
17+
const off = vi.fn((event: string, cb: Handler) => {
18+
const existing = handlers.get(event) ?? [];
19+
handlers.set(
20+
event,
21+
existing.filter(handler => handler !== cb)
22+
);
23+
});
24+
const emit = (event: string, payload: unknown) => {
25+
for (const handler of handlers.get(event) ?? []) {
26+
handler(payload);
27+
}
28+
};
29+
30+
return { id: 'socket-1', on, off, emit };
31+
}
32+
33+
describe('chatService.subscribeChatEvents', () => {
34+
beforeEach(() => {
35+
vi.clearAllMocks();
36+
});
37+
38+
it('subscribes to canonical snake_case chat events only', () => {
39+
const socket = createMockSocket();
40+
vi.mocked(socketService.getSocket).mockReturnValue(socket as never);
41+
42+
subscribeChatEvents({
43+
onToolCall: () => {},
44+
onToolResult: () => {},
45+
onSegment: () => {},
46+
onDone: () => {},
47+
onError: () => {},
48+
});
49+
50+
const subscribedEvents = socket.on.mock.calls.map(call => call[0]);
51+
expect(subscribedEvents).toEqual([
52+
'tool_call',
53+
'tool_result',
54+
'chat_segment',
55+
'chat_done',
56+
'chat_error',
57+
]);
58+
expect(subscribedEvents).not.toContain('chat:tool_call');
59+
expect(subscribedEvents).not.toContain('chat:tool_result');
60+
expect(subscribedEvents).not.toContain('chat:segment');
61+
expect(subscribedEvents).not.toContain('chat:done');
62+
expect(subscribedEvents).not.toContain('chat:error');
63+
});
64+
65+
it('does not process alias events when only canonical subscriptions are active', () => {
66+
const socket = createMockSocket();
67+
vi.mocked(socketService.getSocket).mockReturnValue(socket as never);
68+
const onDone = vi.fn();
69+
70+
subscribeChatEvents({ onDone });
71+
72+
socket.emit('chat:done', { thread_id: 't1' });
73+
expect(onDone).not.toHaveBeenCalled();
74+
75+
socket.emit('chat_done', { thread_id: 't1' });
76+
expect(onDone).toHaveBeenCalledTimes(1);
77+
});
78+
79+
it('removes all handlers on cleanup', () => {
80+
const socket = createMockSocket();
81+
vi.mocked(socketService.getSocket).mockReturnValue(socket as never);
82+
83+
const cleanup = subscribeChatEvents({ onToolCall: () => {}, onDone: () => {} });
84+
cleanup();
85+
86+
const unsubscribedEvents = socket.off.mock.calls.map(call => call[0]);
87+
expect(unsubscribedEvents).toEqual(['tool_call', 'chat_done']);
88+
});
89+
});

app/src/services/chatService.ts

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { socketService } from './socketService';
1111

1212
export interface ChatToolCallEvent {
1313
thread_id: string;
14+
request_id?: string;
1415
tool_name: string;
1516
skill_id: string;
1617
args: Record<string, unknown>;
@@ -19,6 +20,7 @@ export interface ChatToolCallEvent {
1920

2021
export interface ChatToolResultEvent {
2122
thread_id: string;
23+
request_id?: string;
2224
tool_name: string;
2325
skill_id: string;
2426
output: string;
@@ -28,6 +30,7 @@ export interface ChatToolResultEvent {
2830

2931
export interface ChatDoneEvent {
3032
thread_id: string;
33+
request_id?: string;
3134
full_response: string;
3235
rounds_used: number;
3336
total_input_tokens: number;
@@ -60,6 +63,7 @@ export function segmentText(event: ChatSegmentEvent): string {
6063

6164
export interface ChatErrorEvent {
6265
thread_id: string;
66+
request_id?: string;
6367
message: string;
6468
error_type: 'network' | 'timeout' | 'tool_error' | 'inference' | 'cancelled';
6569
round: number | null;
@@ -78,40 +82,45 @@ export function subscribeChatEvents(listeners: ChatEventListeners): () => void {
7882
if (!socket) return () => {};
7983

8084
const handlers: Array<[string, (...args: unknown[]) => void]> = [];
85+
// Canonical convention for web-channel events is snake_case.
86+
// The core emits aliases for compatibility, but subscribing once avoids
87+
// processing the same logical event twice.
88+
const EVENTS = {
89+
toolCall: 'tool_call',
90+
toolResult: 'tool_result',
91+
segment: 'chat_segment',
92+
done: 'chat_done',
93+
error: 'chat_error',
94+
} as const;
8195

8296
if (listeners.onToolCall) {
8397
const cb = (payload: unknown) => listeners.onToolCall?.(payload as ChatToolCallEvent);
84-
socket.on('chat:tool_call', cb);
85-
socket.on('tool_call', cb);
86-
handlers.push(['chat:tool_call', cb], ['tool_call', cb]);
98+
socket.on(EVENTS.toolCall, cb);
99+
handlers.push([EVENTS.toolCall, cb]);
87100
}
88101

89102
if (listeners.onToolResult) {
90103
const cb = (payload: unknown) => listeners.onToolResult?.(payload as ChatToolResultEvent);
91-
socket.on('chat:tool_result', cb);
92-
socket.on('tool_result', cb);
93-
handlers.push(['chat:tool_result', cb], ['tool_result', cb]);
104+
socket.on(EVENTS.toolResult, cb);
105+
handlers.push([EVENTS.toolResult, cb]);
94106
}
95107

96108
if (listeners.onSegment) {
97109
const cb = (payload: unknown) => listeners.onSegment?.(payload as ChatSegmentEvent);
98-
socket.on('chat:segment', cb);
99-
socket.on('chat_segment', cb);
100-
handlers.push(['chat:segment', cb], ['chat_segment', cb]);
110+
socket.on(EVENTS.segment, cb);
111+
handlers.push([EVENTS.segment, cb]);
101112
}
102113

103114
if (listeners.onDone) {
104115
const cb = (payload: unknown) => listeners.onDone?.(payload as ChatDoneEvent);
105-
socket.on('chat:done', cb);
106-
socket.on('chat_done', cb);
107-
handlers.push(['chat:done', cb], ['chat_done', cb]);
116+
socket.on(EVENTS.done, cb);
117+
handlers.push([EVENTS.done, cb]);
108118
}
109119

110120
if (listeners.onError) {
111121
const cb = (payload: unknown) => listeners.onError?.(payload as ChatErrorEvent);
112-
socket.on('chat:error', cb);
113-
socket.on('chat_error', cb);
114-
handlers.push(['chat:error', cb], ['chat_error', cb]);
122+
socket.on(EVENTS.error, cb);
123+
handlers.push([EVENTS.error, cb]);
115124
}
116125

117126
return () => {

0 commit comments

Comments
 (0)