Skip to content

Commit b447802

Browse files
sugyanclaude
andauthored
fix: unify message processing pipelines between streaming and history (#262)
* fix: unify message processing pipelines between streaming and history - Create UnifiedMessageProcessor class for consistent message handling - Refactor useStreamParser to use UnifiedProcessor - Update messageConversion to use UnifiedProcessor for batch processing - Remove obsolete useToolHandling.ts and related test mocks - Ensure TodoWrite displays identically in streaming and history views Resolves #258 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: correct message ordering in batch processing - Fix incorrect use of unshift() that was breaking message order - Ensure proper order: thinking messages → tool messages → text messages - Address Copilot PR review feedback 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 45a87ef commit b447802

5 files changed

Lines changed: 601 additions & 482 deletions

File tree

frontend/src/hooks/streaming/useStreamParser.test.ts

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,6 @@ import type { SDKMessage } from "../../types";
66
import { generateId } from "../../utils/id";
77

88
// Mock dependencies
9-
vi.mock("../useMessageConverter", () => ({
10-
useMessageConverter: () => ({
11-
createSystemMessage: vi.fn((data) => ({ ...data, timestamp: Date.now() })),
12-
createToolMessage: vi.fn((data) => ({ ...data, timestamp: Date.now() })),
13-
createResultMessage: vi.fn((data) => ({ ...data, timestamp: Date.now() })),
14-
createToolResultMessage: vi.fn((data) => ({
15-
...data,
16-
timestamp: Date.now(),
17-
})),
18-
}),
19-
}));
20-
21-
vi.mock("./useToolHandling", () => ({
22-
useToolHandling: () => ({
23-
toolUseCache: {
24-
set: vi.fn(),
25-
get: vi.fn(),
26-
},
27-
processToolResult: vi.fn(),
28-
}),
29-
}));
309

3110
describe("useStreamParser", () => {
3211
let mockContext: StreamingContext;

frontend/src/hooks/streaming/useStreamParser.ts

Lines changed: 49 additions & 195 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { useCallback } from "react";
1+
import { useCallback, useMemo } from "react";
22
import type {
33
StreamResponse,
44
SDKMessage,
@@ -11,232 +11,86 @@ import {
1111
isResultMessage,
1212
isUserMessage,
1313
} from "../../utils/messageTypes";
14-
import { useMessageConverter } from "../useMessageConverter";
15-
import { createTodoMessageFromInput } from "../../utils/messageConversion";
1614
import type { StreamingContext } from "./useMessageProcessor";
17-
import { useToolHandling } from "./useToolHandling";
18-
import { isThinkingContentItem } from "../../utils/messageTypes";
15+
import {
16+
UnifiedMessageProcessor,
17+
type ProcessingContext,
18+
} from "../../utils/UnifiedMessageProcessor";
1919

2020
export function useStreamParser() {
21-
const {
22-
createSystemMessage,
23-
createToolMessage,
24-
createResultMessage,
25-
createToolResultMessage,
26-
createThinkingMessage,
27-
} = useMessageConverter();
28-
29-
const { toolUseCache, processToolResult } = useToolHandling();
30-
31-
const handleSystemMessage = useCallback(
32-
(
33-
claudeData: Extract<SDKMessage, { type: "system" }>,
34-
context: StreamingContext,
35-
) => {
36-
// Check if this is an init message and if we should show it
37-
if (claudeData.subtype === "init") {
38-
// Mark that we've received init
39-
context.setHasReceivedInit?.(true);
40-
41-
const shouldShow = context.shouldShowInitMessage?.() ?? true;
42-
if (shouldShow) {
43-
const systemMessage = createSystemMessage(claudeData);
44-
context.addMessage(systemMessage);
45-
context.onInitMessageShown?.();
46-
}
47-
} else {
48-
// Always show non-init system messages
49-
const systemMessage = createSystemMessage(claudeData);
50-
context.addMessage(systemMessage);
51-
}
52-
},
53-
[createSystemMessage],
54-
);
55-
56-
const handleAssistantTextMessage = useCallback(
57-
(contentItem: { text?: string }, context: StreamingContext) => {
58-
let messageToUpdate = context.currentAssistantMessage;
59-
60-
if (!messageToUpdate) {
61-
messageToUpdate = {
62-
type: "chat",
63-
role: "assistant",
64-
content: "",
65-
timestamp: Date.now(),
66-
};
67-
context.setCurrentAssistantMessage(messageToUpdate);
68-
context.addMessage(messageToUpdate);
69-
}
70-
71-
const updatedContent =
72-
(messageToUpdate.content || "") + (contentItem.text || "");
73-
74-
// Update the current assistant message state
75-
const updatedMessage = {
76-
...messageToUpdate,
77-
content: updatedContent,
21+
// Create a single unified processor instance
22+
const processor = useMemo(() => new UnifiedMessageProcessor(), []);
23+
24+
// Convert StreamingContext to ProcessingContext
25+
const adaptContext = useCallback(
26+
(context: StreamingContext): ProcessingContext => {
27+
return {
28+
// Core message handling
29+
addMessage: context.addMessage,
30+
updateLastMessage: context.updateLastMessage,
31+
32+
// Current assistant message state
33+
currentAssistantMessage: context.currentAssistantMessage,
34+
setCurrentAssistantMessage: context.setCurrentAssistantMessage,
35+
36+
// Session handling
37+
onSessionId: context.onSessionId,
38+
hasReceivedInit: context.hasReceivedInit,
39+
setHasReceivedInit: context.setHasReceivedInit,
40+
41+
// Init message handling
42+
shouldShowInitMessage: context.shouldShowInitMessage,
43+
onInitMessageShown: context.onInitMessageShown,
44+
45+
// Permission/Error handling
46+
onPermissionError: context.onPermissionError,
47+
onAbortRequest: context.onAbortRequest,
7848
};
79-
context.setCurrentAssistantMessage(updatedMessage);
80-
context.updateLastMessage(updatedContent);
8149
},
8250
[],
8351
);
8452

85-
const handleThinkingMessage = useCallback(
86-
(contentItem: { thinking: string }, context: StreamingContext) => {
87-
const thinkingMessage = createThinkingMessage(contentItem.thinking);
88-
context.addMessage(thinkingMessage);
89-
},
90-
[createThinkingMessage],
91-
);
92-
93-
const handleToolUseMessage = useCallback(
94-
(
95-
contentItem: {
96-
id?: string;
97-
name?: string;
98-
input?: Record<string, unknown>;
99-
},
100-
context: StreamingContext,
101-
) => {
102-
// Cache tool_use information for later permission error handling
103-
if (contentItem.id && contentItem.name) {
104-
toolUseCache.set(
105-
contentItem.id,
106-
contentItem.name,
107-
contentItem.input || {},
108-
);
109-
}
110-
111-
// Special handling for ExitPlanMode - create plan message instead of tool message
112-
if (contentItem.name === "ExitPlanMode") {
113-
const planContent = (contentItem.input?.plan as string) || "";
114-
const planMessage = {
115-
type: "plan" as const,
116-
plan: planContent,
117-
toolUseId: contentItem.id || "",
118-
timestamp: Date.now(),
119-
};
120-
context.addMessage(planMessage);
121-
} else if (contentItem.name === "TodoWrite") {
122-
// Special handling for TodoWrite - create todo message from input
123-
const todoMessage = createTodoMessageFromInput(contentItem.input || {});
124-
if (todoMessage) {
125-
context.addMessage(todoMessage);
126-
} else {
127-
// Fallback to regular tool message if todo parsing fails
128-
const toolMessage = createToolMessage(contentItem);
129-
context.addMessage(toolMessage);
130-
}
131-
} else {
132-
const toolMessage = createToolMessage(contentItem);
133-
context.addMessage(toolMessage);
134-
}
135-
},
136-
[createToolMessage, toolUseCache],
137-
);
138-
139-
const handleAssistantMessage = useCallback(
140-
(
141-
claudeData: Extract<SDKMessage, { type: "assistant" }>,
142-
context: StreamingContext,
143-
) => {
144-
for (const contentItem of claudeData.message.content) {
145-
if (isThinkingContentItem(contentItem)) {
146-
handleThinkingMessage(contentItem, context);
147-
} else if (contentItem.type === "text") {
148-
handleAssistantTextMessage(contentItem, context);
149-
} else if (contentItem.type === "tool_use") {
150-
handleToolUseMessage(contentItem, context);
151-
}
152-
}
153-
},
154-
[handleThinkingMessage, handleAssistantTextMessage, handleToolUseMessage],
155-
);
156-
157-
const handleResultMessage = useCallback(
158-
(
159-
claudeData: Extract<SDKMessage, { type: "result" }>,
160-
context: StreamingContext,
161-
) => {
162-
const resultMessage = createResultMessage(claudeData);
163-
context.addMessage(resultMessage);
164-
context.setCurrentAssistantMessage(null);
165-
},
166-
[createResultMessage],
167-
);
168-
169-
const handleUserMessage = useCallback(
170-
(
171-
claudeData: Extract<SDKMessage, { type: "user" }>,
172-
context: StreamingContext,
173-
) => {
174-
// Check if this user message contains tool_result content
175-
const messageContent = claudeData.message.content;
176-
177-
if (Array.isArray(messageContent)) {
178-
for (const contentItem of messageContent) {
179-
if (contentItem.type === "tool_result") {
180-
processToolResult(contentItem, context, createToolResultMessage);
181-
}
182-
}
183-
}
184-
// Note: We don't display regular user messages from the SDK as they represent Claude's internal tool results
185-
},
186-
[createToolResultMessage, processToolResult],
187-
);
188-
18953
const processClaudeData = useCallback(
19054
(claudeData: SDKMessage, context: StreamingContext) => {
191-
// Update sessionId only for the first assistant message after init
192-
if (
193-
claudeData.type === "assistant" &&
194-
context.hasReceivedInit &&
195-
claudeData.session_id &&
196-
context.onSessionId
197-
) {
198-
context.onSessionId(claudeData.session_id);
199-
}
55+
const processingContext = adaptContext(context);
20056

57+
// Validate message types before processing
20158
switch (claudeData.type) {
20259
case "system":
203-
if (isSystemMessage(claudeData)) {
204-
handleSystemMessage(claudeData, context);
205-
} else {
60+
if (!isSystemMessage(claudeData)) {
20661
console.warn("Invalid system message:", claudeData);
62+
return;
20763
}
20864
break;
20965
case "assistant":
210-
if (isAssistantMessage(claudeData)) {
211-
handleAssistantMessage(claudeData, context);
212-
} else {
66+
if (!isAssistantMessage(claudeData)) {
21367
console.warn("Invalid assistant message:", claudeData);
68+
return;
21469
}
21570
break;
21671
case "result":
217-
if (isResultMessage(claudeData)) {
218-
handleResultMessage(claudeData, context);
219-
} else {
72+
if (!isResultMessage(claudeData)) {
22073
console.warn("Invalid result message:", claudeData);
74+
return;
22175
}
22276
break;
22377
case "user":
224-
if (isUserMessage(claudeData)) {
225-
handleUserMessage(claudeData, context);
226-
} else {
78+
if (!isUserMessage(claudeData)) {
22779
console.warn("Invalid user message:", claudeData);
80+
return;
22881
}
22982
break;
23083
default:
23184
console.log("Unknown Claude message type:", claudeData);
85+
return;
23286
}
87+
88+
// Process the message using the unified processor
89+
processor.processMessage(claudeData, processingContext, {
90+
isStreaming: true,
91+
});
23392
},
234-
[
235-
handleSystemMessage,
236-
handleAssistantMessage,
237-
handleResultMessage,
238-
handleUserMessage,
239-
],
93+
[processor, adaptContext],
24094
);
24195

24296
const processStreamLine = useCallback(

0 commit comments

Comments
 (0)