Parent Issue
Part of #500 — depends on #501, used by #502
Context
The existing chat.py endpoint calls agent.chat(user_message) synchronously and returns a complete string response. For live interactive sessions, the frontend needs token-by-token streaming so the UI updates as the model generates. This issue builds a streaming adapter that wraps the Anthropic SDK streaming API and emits typed events for the WebSocket relay.
Existing Code to Build On
codeframe/core/adapters/claude_code.py — existing Claude Code adapter (subprocess-based)
codeframe/adapters/llm/ — existing LLM adapter layer
codeframe/core/streaming.py — EventPublisher async event distribution pattern
What to Build
New file: codeframe/core/adapters/streaming_chat.py
from dataclasses import dataclass
from typing import AsyncIterator
from enum import Enum
class ChatEventType(str, Enum):
TEXT_DELTA = "text_delta"
TOOL_USE_START = "tool_use_start"
TOOL_RESULT = "tool_result"
THINKING = "thinking"
COST_UPDATE = "cost_update"
DONE = "done"
ERROR = "error"
@dataclass
class ChatEvent:
type: ChatEventType
content: str | None = None
tool_name: str | None = None
tool_input: dict | None = None
cost_usd: float | None = None
input_tokens: int | None = None
output_tokens: int | None = None
class StreamingChatAdapter:
"""
Wraps the Anthropic SDK streaming API.
Yields ChatEvent objects as the model generates.
Supports interrupt via asyncio.Event.
Persists messages to session_messages table after each turn.
"""
def __init__(self, session_id: str, model: str, db_repo):
...
async def send_message(
self,
content: str,
history: list[dict],
interrupt_event: asyncio.Event | None = None,
) -> AsyncIterator[ChatEvent]:
"""
Stream a single turn.
Uses anthropic.AsyncAnthropic().messages.stream().
Yields ChatEvent per SDK event type:
- RawContentBlockDeltaEvent → TEXT_DELTA or THINKING
- RawContentBlockStartEvent (tool_use) → TOOL_USE_START
- tool execution result → TOOL_RESULT
- MessageStopEvent → COST_UPDATE + DONE
Checks interrupt_event between chunks; if set, stops and closes stream.
Persists user message and complete assistant turn to session_messages.
"""
...
Tool execution
For interactive sessions, run a limited safe tool set (read files, search, list directory). Do not execute shell commands or write files without explicit user confirmation — keep the scope conservative for v1.
Tools to support initially:
Read (read a file)
Glob (find files by pattern)
Grep (search file contents)
Each tool call yields TOOL_USE_START → executes → yields TOOL_RESULT.
Message history management
- Load
session_messages from DB at session start to reconstruct context
- Append new user message before the API call
- Append complete assistant response after the stream ends
- Truncate history if context window approaches limit (use
tiktoken already in deps)
Acceptance Criteria
Notes
- Use
anthropic.AsyncAnthropic() — do not use the synchronous client
- Model defaults to
claude-sonnet-4-6 if not specified in session
- Check
ANTHROPIC_API_KEY env var; raise clear error if missing
Parent Issue
Part of #500 — depends on #501, used by #502
Context
The existing
chat.pyendpoint callsagent.chat(user_message)synchronously and returns a complete string response. For live interactive sessions, the frontend needs token-by-token streaming so the UI updates as the model generates. This issue builds a streaming adapter that wraps the Anthropic SDK streaming API and emits typed events for the WebSocket relay.Existing Code to Build On
codeframe/core/adapters/claude_code.py— existing Claude Code adapter (subprocess-based)codeframe/adapters/llm/— existing LLM adapter layercodeframe/core/streaming.py—EventPublisherasync event distribution patternWhat to Build
New file:
codeframe/core/adapters/streaming_chat.pyTool execution
For interactive sessions, run a limited safe tool set (read files, search, list directory). Do not execute shell commands or write files without explicit user confirmation — keep the scope conservative for v1.
Tools to support initially:
Read(read a file)Glob(find files by pattern)Grep(search file contents)Each tool call yields
TOOL_USE_START→ executes → yieldsTOOL_RESULT.Message history management
session_messagesfrom DB at session start to reconstruct contexttiktokenalready in deps)Acceptance Criteria
StreamingChatAdapter.send_message()yieldsTEXT_DELTAevents as tokens arrive (not buffered)TOOL_USE_STARTwith tool name + input, thenTOOL_RESULTwith outputTHINKINGevents emitted when extended thinking is enabledCOST_UPDATEevent emitted at turn end with correct token countsinterrupt_event.set()causes the stream to stop within 1 turnsession_messagesafter each complete turntests/core/test_streaming_chat.pyusing mocked Anthropic clientNotes
anthropic.AsyncAnthropic()— do not use the synchronous clientclaude-sonnet-4-6if not specified in sessionANTHROPIC_API_KEYenv var; raise clear error if missing