feat(core): StreamingChatAdapter for token-by-token Anthropic relay (#503)#512
Conversation
…503) Implements StreamingChatAdapter in codeframe/core/adapters/streaming_chat.py: - AsyncIterator[ChatEvent] over anthropic.AsyncAnthropic().messages.stream() - ChatEventType enum: TEXT_DELTA, TOOL_USE_START, TOOL_RESULT, THINKING, COST_UPDATE, DONE, ERROR - Read-only safe tool set: read_file, list_files, search_codebase - Internal tool-use loop: streams → executes tools → re-enters stream - Interrupt support via asyncio.Event checked between chunks - History load/persist via InteractiveSessionRepository.get_messages/add_message - tiktoken-based context truncation to 180k token budget - Cost estimation per turn Replaces the echo stub in session_chat_ws.py with a real adapter call. Adds 15 unit tests covering all event types, tool calls, interrupt, persistence, and error handling.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
WalkthroughAdds a new StreamingChatAdapter that wraps Anthropic's async streaming API, emits typed ChatEvent objects (text deltas, thinking, tool lifecycle, cost, done, error), manages/truncates history, executes a safe tool subset, persists completed turns in background, and replaces the previous inline token-stub in the WebSocket router. Changes
Sequence DiagramsequenceDiagram
participant Client as WebSocket Client
participant Router as session_chat_ws Router
participant Adapter as StreamingChatAdapter
participant Anthropic as Anthropic SDK
participant ToolExec as Tool Executor
participant DB as Database
Client->>Router: send_message(content)
Router->>Router: obtain workspace_path
Router->>Adapter: new(session_id, db_repo, workspace_path)
Adapter->>DB: _load_history(session_id)
DB-->>Adapter: prior messages
Router->>Adapter: send_message(content, history)
Adapter->>Anthropic: AsyncAnthropic().messages.stream()
loop stream chunks
Anthropic-->>Adapter: RawContentBlockDeltaEvent
Adapter-->>Router: ChatEvent(TEXT_DELTA / THINKING)
Router->>Client: websocket event.to_dict()
alt tool use start
Anthropic-->>Adapter: RawContentBlockStartEvent(tool_use)
Adapter-->>Router: ChatEvent(TOOL_USE_START)
Router->>Client: tool_use_start event
Adapter->>ToolExec: execute safe tool (threadpool)
ToolExec-->>Adapter: tool output
Adapter-->>Router: ChatEvent(TOOL_RESULT)
Router->>Client: tool_result event
Adapter->>Anthropic: append tool result as user message (continue)
end
end
Anthropic-->>Adapter: MessageStopEvent (usage)
Adapter-->>Router: ChatEvent(COST_UPDATE)
Adapter->>DB: _persist_turn(user_msg, assistant_text) (background)
Adapter-->>Router: ChatEvent(DONE)
Router->>Client: done event
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
Code Review — StreamingChatAdapter (#512)Good implementation overall. Replacing the echo stub with a real streaming adapter is the right move, and the event-driven design with typed 🔴 Medium — History truncation can produce invalid API messages
# Current — drops messages one-by-one from the front:
while messages and _count(messages) > _MAX_HISTORY_TOKENS:
messages = messages[1:]Fix: always drop in pairs (user + the assistant reply that follows), or enforce that the result list starts with a user message: while messages and _count(messages) > _MAX_HISTORY_TOKENS:
# Drop a pair so we don't strand an assistant message at index 0
messages = messages[2:] if len(messages) >= 2 else messages[1:]
# Ensure first message is from the user
while messages and messages[0]["role"] != "user":
messages = messages[1:]🔴 Medium — No system prompt in
|
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@codeframe/core/adapters/streaming_chat.py`:
- Around line 199-200: The adapter is fetching the default oldest 100 rows from
InteractiveSessionRepository; change the call to self._db_repo.get_messages(...)
to request the newest/full history (e.g., pass parameters to get_messages to set
a larger/no limit and order by created_at DESC or a “latest_first” flag) so you
receive the most recent turns, then convert/normalize the rows into
[{"role":..., "content":...}] and apply the existing local truncation method
(e.g., _truncate_history or equivalent) to trim to the desired window. Ensure
you reference get_messages, _db_repo, _session_id and the local truncation
method when making these changes.
- Around line 313-319: Persist the user turn to the DB immediately after
building/truncating the messages and before calling _stream_turn so the user’s
message survives cancellations or streaming/API failures; specifically, after
messages = self._truncate_history(messages) create and save a user-turn record
(same shape used by your persistence layer) and update any in-memory/history
list to include that persisted user turn, then call _stream_turn; modify
_stream_turn (and any code paths that currently persist the turn after
streaming) to only persist/update the assistant content after a
successful/completed turn (so partial/cancelled streams do not overwrite or
create incomplete assistant text), and ensure the same change is applied to the
similar block around the code at the second location (the 337–341 region)
referenced in the comment; keep _load_history and _truncate_history usage
unchanged except to reflect the persisted user turn in history.
- Around line 367-370: When handling the streaming loop (async for sdk_event in
stream) with an interrupt_event, the current branch returns immediately and
never emits the terminal 'DONE' event; change it to mirror the tool-execution
interrupt handling by sending/relaying a terminal DONE event (same event
shape/name used elsewhere) before returning so the relay and client receive a
clear end-of-turn signal; locate the interrupt check that references
interrupt_event/is_set() inside the stream loop and add the DONE emit/relay
(then return) just like the code path around the tool-execution interrupt at the
later branch.
- Around line 416-421: When rebuilding tool call outputs after
stream.get_final_message() (using final_msg, pending_tool_calls and
_rebuild_tool_inputs), ensure you append an assistant message containing the
tool_use blocks before appending the user message containing the tool_result
blocks; specifically, construct and insert an assistant message reflecting the
executed tool_use (derived from pending_tool_calls or final_msg content)
immediately prior to creating the user tool_result message so the message
history preserves the required assistant tool_use → user tool_result ordering
(update the logic around where final_msg is processed and where the tool_result
user message is appended).
In `@codeframe/ui/routers/session_chat_ws.py`:
- Around line 303-312: The code currently uses
Path(session.get("workspace_path") or ".") which falls back to the server CWD;
instead, validate that session.get("workspace_path") is present and reject the
request if it's missing. Modify the handler before calling
session_chat_manager.get_interrupt_event/_run_streaming_adapter to check
session.get("workspace_path") (or a dedicated variable), and if it is None/empty
send an error response or close the websocket with a clear error (e.g.,
raise/return an HTTP/WebSocket error) so no adapter_task is created with a "."
workspace_path; keep all references to session_id, interrupt_event,
db.interactive_sessions and _run_streaming_adapter unchanged except for gating
them behind this validation.
In `@tests/core/test_streaming_chat.py`:
- Around line 14-23: The test file imports unused symbols causing lint failures:
remove the unused imports pytest_asyncio and ChatEvent from
tests/core/test_streaming_chat.py; update the import block that currently
imports ChatEvent, ChatEventType, StreamingChatAdapter to only import
ChatEventType and StreamingChatAdapter (and remove the separate pytest_asyncio
import), then run the tests/linter to confirm Ruff passes.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f5e70dd3-7a60-45ed-8196-268ef16892f6
📒 Files selected for processing (4)
codeframe/core/adapters/__init__.pycodeframe/core/adapters/streaming_chat.pycodeframe/ui/routers/session_chat_ws.pytests/core/test_streaming_chat.py
| rows = self._db_repo.get_messages(self._session_id) | ||
| return [{"role": r["role"], "content": r["content"]} for r in rows] |
There was a problem hiding this comment.
Load the latest history window, not the first 100 rows.
InteractiveSessionRepository.get_messages() defaults to LIMIT 100 with ORDER BY created_at, so once a session grows past 100 messages this adapter will replay the oldest context and drop the most recent turns before _truncate_history() even runs. Ask the repo for the newest/full history here, then trim locally.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@codeframe/core/adapters/streaming_chat.py` around lines 199 - 200, The
adapter is fetching the default oldest 100 rows from
InteractiveSessionRepository; change the call to self._db_repo.get_messages(...)
to request the newest/full history (e.g., pass parameters to get_messages to set
a larger/no limit and order by created_at DESC or a “latest_first” flag) so you
receive the most recent turns, then convert/normalize the rows into
[{"role":..., "content":...}] and apply the existing local truncation method
(e.g., _truncate_history or equivalent) to trim to the desired window. Ensure
you reference get_messages, _db_repo, _session_id and the local truncation
method when making these changes.
| # Load history from DB if caller didn't supply it | ||
| if not history: | ||
| history = self._load_history() | ||
|
|
||
| # Build the message list for this turn | ||
| messages: list[dict] = list(history) + [{"role": "user", "content": content}] | ||
| messages = self._truncate_history(messages) |
There was a problem hiding this comment.
Persist the user turn before entering the stream.
The only DB write happens after _stream_turn() returns. _receive() in codeframe/ui/routers/session_chat_ws.py cancels any in-flight adapter before starting the next prompt, so cancelled or API-failed turns lose the user's message entirely, and interrupted turns can still write a partial assistant reply. Split this into “persist user upfront” and “persist assistant only after a completed turn.”
Also applies to: 337-341
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@codeframe/core/adapters/streaming_chat.py` around lines 313 - 319, Persist
the user turn to the DB immediately after building/truncating the messages and
before calling _stream_turn so the user’s message survives cancellations or
streaming/API failures; specifically, after messages =
self._truncate_history(messages) create and save a user-turn record (same shape
used by your persistence layer) and update any in-memory/history list to include
that persisted user turn, then call _stream_turn; modify _stream_turn (and any
code paths that currently persist the turn after streaming) to only
persist/update the assistant content after a successful/completed turn (so
partial/cancelled streams do not overwrite or create incomplete assistant text),
and ensure the same change is applied to the similar block around the code at
the second location (the 337–341 region) referenced in the comment; keep
_load_history and _truncate_history usage unchanged except to reflect the
persisted user turn in history.
| async for sdk_event in stream: | ||
| # Honour interrupt between chunks | ||
| if interrupt_event and interrupt_event.is_set(): | ||
| return |
There was a problem hiding this comment.
Emit DONE when the interrupt trips mid-stream.
This branch returns without a terminal event, unlike the tool-execution interrupt path at Lines 441-443. The relay keeps waiting and the client never gets a clear end-of-turn signal for the interrupted response.
🔚 Proposed fix
# Honour interrupt between chunks
if interrupt_event and interrupt_event.is_set():
- return
+ yield ChatEvent(type=ChatEventType.DONE)
+ return📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async for sdk_event in stream: | |
| # Honour interrupt between chunks | |
| if interrupt_event and interrupt_event.is_set(): | |
| return | |
| async for sdk_event in stream: | |
| # Honour interrupt between chunks | |
| if interrupt_event and interrupt_event.is_set(): | |
| yield ChatEvent(type=ChatEventType.DONE) | |
| return |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@codeframe/core/adapters/streaming_chat.py` around lines 367 - 370, When
handling the streaming loop (async for sdk_event in stream) with an
interrupt_event, the current branch returns immediately and never emits the
terminal 'DONE' event; change it to mirror the tool-execution interrupt handling
by sending/relaying a terminal DONE event (same event shape/name used elsewhere)
before returning so the relay and client receive a clear end-of-turn signal;
locate the interrupt check that references interrupt_event/is_set() inside the
stream loop and add the DONE emit/relay (then return) just like the code path
around the tool-execution interrupt at the later branch.
| final_msg = stream.get_final_message() | ||
| stop_reason = final_msg.stop_reason or "end_turn" | ||
|
|
||
| # Rebuild tool inputs from final message (more reliable than streaming) | ||
| if pending_tool_calls and hasattr(final_msg, "content"): | ||
| _rebuild_tool_inputs(final_msg.content, pending_tool_calls) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In Anthropic's Messages API tool-use flow, must the follow-up request include the assistant message containing the tool_usecontent blocks immediately before the usertool_result blocks?
💡 Result:
Yes, in Anthropic's Messages API tool-use flow for client tools, the follow-up request must include the previous assistant message (containing the tool_use content blocks) immediately before the new user message with the tool_result blocks. This is required because the API is stateless, so every request includes the full conversation history, and the validation enforces that each tool_use block is followed directly by its corresponding tool_result in the subsequent user message without intervening messages. Official documentation states: "Tool result blocks must immediately follow their corresponding tool use blocks in the message history. You cannot include any messages between the assistant's tool use message and the user's tool result message." API errors explicitly confirm this: "tool_use ids were found without tool_result blocks immediately after... Each tool_use block must have a corresponding tool_result block in the next [message]."
Citations:
- 1: https://console.anthropic.com/docs/en/agents-and-tools/tool-use/handle-tool-calls
- 2: https://platform.claude.com/docs/en/agents-and-tools/tool-use/handle-tool-calls
🏁 Script executed:
# First, find and read the streaming_chat.py file
fd streaming_chat.pyRepository: frankbria/codeframe
Length of output: 138
🏁 Script executed:
# Read the file to verify the code at lines 416-421 and 466-469
cat -n codeframe/core/adapters/streaming_chat.py | sed -n '400,480p'Repository: frankbria/codeframe
Length of output: 4150
🏁 Script executed:
# Let me look at more context around the code to understand the data flow better
# Check what happens with the assistant message and tool_use blocks
cat -n codeframe/core/adapters/streaming_chat.py | sed -n '350,400p'Repository: frankbria/codeframe
Length of output: 2734
🏁 Script executed:
# Check what final_msg contains and how it's structured
# Look for how final_msg is used elsewhere in the file
rg -A 5 "final_msg\." codeframe/core/adapters/streaming_chat.py | head -50Repository: frankbria/codeframe
Length of output: 997
🏁 Script executed:
# Let's verify what fields are available on final_msg by checking the Message type
# Also check if there's an existing assistant message being built elsewhere
rg "role.*assistant" codeframe/core/adapters/streaming_chat.pyRepository: frankbria/codeframe
Length of output: 92
🏁 Script executed:
# Find the context where role="assistant" appears
rg -B 5 -A 5 'role="assistant"' codeframe/core/adapters/streaming_chat.pyRepository: frankbria/codeframe
Length of output: 393
🏁 Script executed:
# Check the Anthropic SDK to understand Message structure
# Look for imports and how messages are used
rg "from anthropic" codeframe/core/adapters/streaming_chat.pyRepository: frankbria/codeframe
Length of output: 111
🏁 Script executed:
# Let's check what the Message API returns and confirm final_msg has content with tool_use blocks
# Also check if there's any test that shows how messages should be structured
cat -n tests/core/test_streaming_chat.py | head -100Repository: frankbria/codeframe
Length of output: 3463
🏁 Script executed:
# Continue reading the test to see how messages are used and what the expected format is
cat -n tests/core/test_streaming_chat.py | sed -n '100,200p'Repository: frankbria/codeframe
Length of output: 4222
🏁 Script executed:
# Look for tests that involve tool calls and message structure
cat -n tests/core/test_streaming_chat.py | sed -n '200,350p'Repository: frankbria/codeframe
Length of output: 6376
🏁 Script executed:
# Continue reading the tool call test
cat -n tests/core/test_streaming_chat.py | sed -n '350,450p'Repository: frankbria/codeframe
Length of output: 4547
🏁 Script executed:
# Continue reading the persistence test to understand message appending
cat -n tests/core/test_streaming_chat.py | sed -n '450,550p'Repository: frankbria/codeframe
Length of output: 2023
Append the assistant tool_use message before the tool_result user message.
When the API returns tool_use blocks and you execute them, the follow-up request must include both the assistant message (containing the tool_use blocks) and the user message (containing the tool_result blocks). Currently, only the tool_result user message is appended at lines 467–469. The next API request will have ... tool_result with no preceding tool_use block, violating Anthropic's requirement that "tool_use blocks must have corresponding tool_result blocks immediately after" in the message history.
🔁 Proposed fix
while True:
# Track tool calls seen in this API turn for the follow-up message
pending_tool_calls: list[dict] = [] # {id, name, input, result}
active_tool: dict | None = None # buffering the current tool_use block
stop_reason = "end_turn"
+ assistant_content = None
@@
elif event_type == "message_stop":
@@
final_msg = stream.get_final_message()
stop_reason = final_msg.stop_reason or "end_turn"
+ assistant_content = getattr(final_msg, "content", None)
@@
# Append the tool results as a user message for the next turn
current_messages = current_messages + [
- {"role": "user", "content": tool_result_blocks}
+ {"role": "assistant", "content": assistant_content or []},
+ {"role": "user", "content": tool_result_blocks},
]Also applies to: 466–469
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@codeframe/core/adapters/streaming_chat.py` around lines 416 - 421, When
rebuilding tool call outputs after stream.get_final_message() (using final_msg,
pending_tool_calls and _rebuild_tool_inputs), ensure you append an assistant
message containing the tool_use blocks before appending the user message
containing the tool_result blocks; specifically, construct and insert an
assistant message reflecting the executed tool_use (derived from
pending_tool_calls or final_msg content) immediately prior to creating the user
tool_result message so the message history preserves the required assistant
tool_use → user tool_result ordering (update the logic around where final_msg is
processed and where the tool_result user message is appended).
| interrupt_event = await session_chat_manager.get_interrupt_event(session_id) | ||
| workspace_path = Path(session.get("workspace_path") or ".") | ||
| adapter_task[0] = asyncio.create_task( | ||
| _run_streaming_adapter(session_id, content, token_queue, interrupt_event) | ||
| _run_streaming_adapter( | ||
| session_id, | ||
| content, | ||
| token_queue, | ||
| interrupt_event, | ||
| db.interactive_sessions, | ||
| workspace_path, |
There was a problem hiding this comment.
Don’t fall back to the server CWD for workspace-scoped tools.
If session["workspace_path"] is missing, Path(... or ".") scopes read_file, list_files, and search_codebase to whatever directory the process started in. That breaks workspace isolation and can expose the wrong tree. Reject the request unless a concrete workspace path is present instead of silently defaulting to ".". As per coding guidelines, "All API endpoints must require workspace_path query parameter".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@codeframe/ui/routers/session_chat_ws.py` around lines 303 - 312, The code
currently uses Path(session.get("workspace_path") or ".") which falls back to
the server CWD; instead, validate that session.get("workspace_path") is present
and reject the request if it's missing. Modify the handler before calling
session_chat_manager.get_interrupt_event/_run_streaming_adapter to check
session.get("workspace_path") (or a dedicated variable), and if it is None/empty
send an error response or close the websocket with a clear error (e.g.,
raise/return an HTTP/WebSocket error) so no adapter_task is created with a "."
workspace_path; keep all references to session_id, interrupt_event,
db.interactive_sessions and _run_streaming_adapter unchanged except for gating
them behind this validation.
- _truncate_history: drop in pairs to avoid stranding assistant message at index 0 (Anthropic API rejects messages not starting with user role) - _stream_turn: add system= prompt describing CodeFrame context and read-only tool constraints - session_chat_ws: log warning when workspace_path is missing from session - _estimate_cost: add last-verified date to pricing table comment - tests: add TestTruncateHistory covering pair-drop and user-first invariant
|
Review: StreamingChatAdapter (503) |
Review: StreamingChatAdapter (503)Solid implementation replacing the echo stub with real Anthropic streaming. Clean architecture, good safe-tools allowlist, comprehensive test coverage. Issues to address before merge: BUG: Missing DONE event on interrupt. When interrupt_event fires inside _stream_turn loop, method returns without yielding DONE. The old stub sent type:done on interrupt. If the WS consumer waits for that signal to close, this path will hang. Fix: yield DONE before each early return, or push it onto the queue unconditionally after the async-for loop in _run_streaming_adapter. ISSUE: Blocking IO in async context. _load_history() calls db_repo.get_messages() synchronously inside send_message, blocking the event loop. Wrap with asyncio.to_thread to match the pattern in _persist_turn. ISSUE: Private symbol imports. _READ_FILE_SCHEMA, _LIST_FILES_SCHEMA, _SEARCH_CODEBASE_SCHEMA are _-prefixed private details of tools.py. Export them publicly or move to a shared schemas module. ISSUE: input_json_delta silently discarded. Handler just passes. Tool inputs reconstructed post-hoc via _rebuild_tool_inputs which works, but the silent discard is non-obvious. Add a comment explaining the rationale. Minor: _truncate_history is O(n^2) due to repeated list slicing. Missing type annotations on send_message and init. Hardcoded pricing in _estimate_cost will drift - add a comment pointing to Anthropic pricing docs. Looks good: STREAMING_SAFE_TOOLS read-only allowlist is the right security boundary. asyncio.to_thread usage for _persist_turn and _execute_tool is correct. Tool-use loop interrupt checks are in the right place. 15 tests cover all event types cleanly. core/adapters/ placement keeps core headless. |
CodeFRAME Development GuidelinesLast updated: 2026-03-09 Product VisionCodeFrame is a project delivery system: Think → Build → Prove → Ship. It owns the edges of the AI coding pipeline — everything BEFORE code gets written (PRD, specification, task decomposition) and everything AFTER (verification gates, quality memory, deployment). The actual code writing is delegated to frontier coding agents (Claude Code, Codex, OpenCode) that are better at it than any custom agent. CodeFrame does not compete with coding agents. It orchestrates them. Status: Phase 1 ✅ | Phase 2 ✅ | Phase 2.5 ✅ — CLI workflow, server layer, and ReAct agent complete. Agent adapter architecture (#408) and PROOF9 quality system (#422) are next priorities. See If you are an agent working in this repo: do not improvise architecture. Follow the documents listed below. Primary Contract (MUST FOLLOW)
Rule 0: If a change does not directly support the Think → Build → Prove → Ship pipeline, do not implement it. Strategic Priority (Phase 4)The next major architectural work is the Agent Adapter Architecture (#408):
Current Reality (Phase 1, 2 & 2.5 Complete)What's Working Now
v2 Architecture (current)
v1 Legacy
Repository StructureArchitecture Rules (non-negotiable)1) Core must be headless
Core is allowed to:
2) CLI must not require a serverGolden Path commands must work from the CLI with no server running. FastAPI is optional and must be started explicitly (e.g., 3) Agent state transitions flow through runtimeCritical pattern discovered during implementation:
This separation prevents duplicate state transitions (e.g., DONE→DONE, BLOCKED→BLOCKED errors). 4) Legacy can be read, not depended onLegacy code is reference material.
5) Keep commits runnableAt all times:
Agent System ArchitectureComponents
Model Selection StrategyTask-based heuristic via
Future: Engine SelectionCodeFRAME supports two execution engines, selected via
Execution Flow (ReAct — default)Execution Flow (Plan — legacy,
|
| Phase | Focus | Pipeline Stage | Status |
|---|---|---|---|
| 1 | CLI Completion | Think + Build | ✅ Complete |
| 2 | Server Layer | Build (API) | ✅ Complete |
| 2.5 | ReAct Agent | Build (execution) | ✅ Complete |
| 3 | Web UI Rebuild | All (dashboard) | In Progress |
| 4 | Agent Adapters + Orchestration | Build (delegate to frontier agents) | Next |
| 5 | PROOF9 + Advanced | Prove + Ship (quality memory) | Planned |
Phase 2 Complete: Server Layer (2026-02-03)
Phase 2 deliverables completed:
- ✅ Server audit and refactor ([Phase 2] Server audit and refactor - routes delegating to core modules #322) - 16 v2 routers following thin adapter pattern
- ✅ API key authentication (feat(auth): add API key authentication for CLI and REST API #326) - Scopes: read/write/admin
- ✅ Rate limiting (feat(security): add API rate limiting with slowapi #327) - Configurable per-endpoint with Redis support
- ✅ Real-time SSE streaming (feat(streaming): add real-time SSE events for task execution #328) -
/api/v2/tasks/{id}/stream - ✅ OpenAPI documentation ([Phase 2] Complete OpenAPI documentation for all endpoints #119) - Full Swagger/ReDoc with examples
Server Architecture (Phase 2)
Pattern: Thin adapter over core - server routes delegate to core.* modules.
CLI (typer) ─┬── core.* ─── adapters.*
│
Server (fastapi) ─┘
V2 Router Modules (16 total):
| Router | Endpoints | Purpose |
|---|---|---|
blockers_v2 |
5 | Blocker CRUD |
prd_v2 |
8 | PRD management + versioning |
tasks_v2 |
12 | Task management + streaming |
workspace_v2 |
5 | Init, status, tech stack |
batches_v2 |
5 | Batch execution strategies |
streaming_v2 |
2 | SSE event streaming |
api_key_v2 |
4 | API key management |
discovery_v2 |
5 | PRD discovery sessions |
checkpoints_v2 |
6 | State checkpoints |
schedule_v2 |
3 | Task scheduling |
templates_v2 |
4 | PRD templates |
git_v2 |
3 | Git operations |
review_v2 |
2 | Code review |
pr_v2 |
5 | GitHub PR workflow |
environment_v2 |
4 | Tool detection |
proof_v2 |
7 | PROOF9 quality gates + requirements |
API Authentication:
# Create API key
cf auth api-key-create --name "my-key" --scopes read,write
# Use in requests
curl -H "X-API-Key: cf_..." https://api.example.com/api/v2/tasksRate Limiting:
- Default: 100 requests/minute (standard endpoints)
- Auth endpoints: 10/minute
- AI endpoints: 20/minute
- Configurable via
RATE_LIMIT_*environment variables
OpenAPI Documentation:
- Swagger UI:
/docs - ReDoc:
/redoc - OpenAPI JSON:
/openapi.json
Previous Updates (2026-01-29)
V2 Strategic Roadmap Established
Created comprehensive 5-phase roadmap in docs/V2_STRATEGIC_ROADMAP.md.
Phase 1 Complete: CLI Foundation
All Phase 1 priorities completed:
- ✅
cf prd generate- Socratic PRD discovery ([Phase 1] cf prd generate - Interactive AI PRD creation (Socratic Discovery) #307) - ✅
cf work follow- Live execution streaming ([Phase 1] cf work follow - Live execution streaming #308) - ✅ Integration tests for credential/env modules ([Phase 1] Integration tests for credential and environment modules #309)
- ✅ PRD template system ([Phase 1] PRD template system for customizable output formats #316)
Environment Validation (cf env)
New commands for validating development environment:
cf env check # Validate required tools (git, uv, ruff, pytest)
cf env install # Install missing tools automatically
cf env doctor # Comprehensive environment health checkModules:
core/environment.py- Tool detection and validationcore/installer.py- Cross-platform tool installation
GitHub PR Workflow (cf pr)
Streamlined PR management without leaving the CLI:
cf pr create # Create PR from current branch
cf pr status # Show PR status and review state
cf pr checks # Show CI check results
cf pr merge # Merge approved PRTask Self-Diagnosis (cf work diagnose)
AI-powered analysis of failed tasks:
cf work diagnose <task-id> # Analyze why a task failedModules:
core/diagnostics.py- Failed task analysiscore/diagnostic_agent.py- AI-powered diagnosis
Bug Fixes
- [Phase 1] Backend: NoneType error accessing search_pattern during task execution #265: Fixed NoneType error in
codebase_index.search_pattern()- added null check - [Phase 1] Checkpoint diff API returns 500 - workspace directory missing #253: Fixed checkpoint diff API returning 500 - added workspace existence validation
GitHub Issue Organization
- Created
v1-legacylabel for 22 v1-specific issues (closed, retained as Phase 3 reference) - Created phase labels:
phase-1,phase-2,phase-4,phase-5 - Created 9 new issues ([Phase 1] cf prd generate - Interactive AI PRD creation (Socratic Discovery) #307-[Phase 5] Debug and replay mode #315) for roadmap features
- Consistent naming:
[Phase #] Titleformat
Previous Updates (2026-01-16)
Phase 3.1: Tech Stack Configuration
Simplified tech stack configuration using natural language descriptions:
- ✅
tech_stackfield on Workspace model - stores natural language description - ✅
--detectflag - auto-detects from pyproject.toml, package.json, Cargo.toml, go.mod - ✅
--tech-stackflag - explicit tech stack description (e.g., "Rust project with cargo") - ✅
--tech-stack-interactiveflag - simple prompt for user input (stub for future multi-round) - ✅ Agent integration - TaskContext and Planner include tech_stack in LLM prompts
- ✅ Removed
cf configsubcommand - tech stack is now part of workspace init
Design philosophy: Instead of structured configuration with specific package managers and frameworks, users describe their stack in natural language. The agent interprets and adapts.
Examples:
cf init . --detect # Auto-detect: "Python with uv, pytest, ruff for linting"
cf init . --tech-stack "Rust project using cargo"
cf init . --tech-stack "TypeScript monorepo with pnpm, Next.js, jest"
cf init . --tech-stack-interactive # Prompts user for descriptionFuture work: Multi-round interactive discovery (bead: codeframe-8d80)
Agent Self-Correction & Observability
Improved agent reliability with automatic error recovery:
- ✅ Self-correction loop in
_run_final_verification()- agent retries up to 3 times - ✅ Verbose mode (
--verbose/-v) - shows detailed verification/self-correction progress - ✅ FAILED task status - tasks transition to FAILED for proper error visibility
- ✅ Project preferences - agent loads AGENTS.md/CLAUDE.md for per-project config
- ✅ Fixed
fail_run()- now properly transitions task status (was leaving tasks stuck)
Enhanced Self-Correction (Phase 3.4)
Advanced error recovery with loop prevention and smart escalation:
-
✅ Fix Attempt Tracker (
core/fix_tracker.py) - prevents repeating failed fixes- Normalizes errors for comparison (removes line numbers, memory addresses)
- Tracks (error_signature, fix_description) pairs with outcomes
- Detects escalation patterns (same error 3+ times, same file 3+ times)
-
✅ Pattern-Based Quick Fixes (
core/quick_fixes.py) - fixes common errors without LLMModuleNotFoundError→ auto-install package (detects package manager)ImportError→ add missing import statementNameError→ add common imports (Optional, dataclass, Path, etc.)SyntaxError→ fix missing colons, f-string prefixesIndentationError→ normalize mixed tabs/spaces
-
✅ Escalation to Blocker - creates informative blockers when stuck
- Triggered after MAX_SAME_ERROR_ATTEMPTS (3) failures on same error
- Triggered after MAX_SAME_FILE_ATTEMPTS (3) failures on same file
- Triggered after MAX_TOTAL_FAILURES (5) in a run
- Blocker includes error type, attempted fixes, and guidance questions
Self-Correction Flow
Error occurs
│
├── Try ruff --fix (auto-lint)
│
├── Try pattern-based quick fix (no LLM)
│ ├── Check if fix already attempted → skip
│ ├── Apply fix
│ └── Record outcome in tracker
│
├── Check escalation threshold
│ └── If exceeded → create escalation blocker
│
└── Use LLM to generate fix plan
├── Include already-tried fixes to avoid repetition
├── Execute fix steps with tracking
└── Re-verify
Key Self-Correction Methods
_run_final_verification(): While loop that re-runs gates after self-correction_attempt_verification_fix(): Orchestrates quick fixes, escalation check, LLM fixes_create_escalation_blocker(): Creates detailed blocker with context_verbose_print(): Conditional stdout output for observability
Phase 2 Complete (2026-01-15): Parallel Batch Execution
All 6 Phase 2 items from CLI_WIREFRAME.md are done:
- ✅
work batch resume <batch-id>- re-run failed/blocked tasks - ✅
depends_onfield on Task model - ✅ Dependency graph analysis (DAG, cycle detection, topological sort)
- ✅ True parallel execution with ThreadPoolExecutor worker pool
- ✅
--strategy autowith LLM-based dependency inference - ✅
--retry Nautomatic retry of failed tasks
Key Phase 2 Modules
- conductor.py: Batch orchestration with serial/parallel/auto strategies
- dependency_graph.py: DAG operations, level-based grouping for parallelization
- dependency_analyzer.py: LLM analyzes task descriptions to infer dependencies
Agent Implementation Complete (2026-01-14)
All 8 implementation tasks from AGENT_IMPLEMENTATION_TASKS.md are done:
- ✅ LLM Adapter Interface (
adapters/llm/) - ✅ Task Context Loader (
core/context.py) - ✅ Agent Planning (
core/planner.py) - ✅ Code Execution Engine (
core/executor.py) - ✅ Automatic Blocker Detection (in
core/agent.py) - ✅ Gate Integration (in
core/agent.py) - ✅ Agent Orchestrator (
core/agent.py) - ✅ Wire into Runtime (
core/runtime.py)
Bug Fixes During Testing
- GateResult attribute access: Fixed
gate_result.status→gate_result.passed - Duplicate task transitions: Removed task status updates from agent.py (runtime handles all)
- READY→READY error: Added check in
stop_runbefore transitioning - Verification step handling: Made
_execute_verificationsmarter about file vs command targets
Key Design Decisions
- State separation: Agent manages AgentState, Runtime manages TaskStatus
- Model selection: Task-based heuristic via Purpose enum
- Blocker creation: Agent creates blockers, Runtime updates task status
- Verification: Incremental (ruff after each file change) + final (all gates)
Testing
Run all tests
uv run pytestRun v2 tests only
uv run pytest -m v2 # All v2 tests (~411 tests)
uv run pytest -m v2 -q # Quiet modeThe v2 marker identifies tests for CLI-first, headless functionality:
- All tests in
tests/core/are automatically marked v2 (via conftest.py) - v2 CLI tests have
pytestmark = pytest.mark.v2at the top
Convention: When adding new v2 functionality, mark tests with @pytest.mark.v2 or add pytestmark = pytest.mark.v2 at module level for CLI tests that use codeframe.cli.app.
Run core module tests
uv run pytest tests/core/
uv run pytest tests/core/test_agent.py -v
uv run pytest tests/adapters/test_llm.py -vTest coverage
uv run pytest --cov=codeframe --cov-report=htmlEnvironment Variables
# Required for agent execution
ANTHROPIC_API_KEY=sk-ant-...
# Optional - Database
DATABASE_PATH=./codeframe.db
# Optional - Rate Limiting (Phase 2)
RATE_LIMIT_ENABLED=true # Enable/disable rate limiting
RATE_LIMIT_DEFAULT=100/minute # Default limit
RATE_LIMIT_AUTH=10/minute # Auth endpoints
RATE_LIMIT_AI=20/minute # AI/LLM endpoints
RATE_LIMIT_WEBSOCKET=50/minute # WebSocket connections
REDIS_URL=redis://localhost:6379 # Redis for distributed rate limiting (optional)
# Optional - API Server
CODEFRAME_API_KEY_SECRET=<random-secret> # Secret for API key hashingLegacy sections removed on purpose
This file previously contained extensive v1 details (auth, websocket, UI template, sprint history).
Those are still in git history and legacy docs, but they are not the current contract.
The current contract is Golden Path + Refactor Plan + Command Tree mapping + Agent Implementation.
|
Review content pending - needs file write permission |
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (4)
codeframe/core/adapters/streaming_chat.py (4)
378-381:⚠️ Potential issue | 🟠 MajorEmit
DONEwhen an interrupt trips mid-stream.This branch exits the generator without a terminal event, so interrupted responses never produce a clear end-of-turn signal.
🔚 Proposed fix
# Honour interrupt between chunks if interrupt_event and interrupt_event.is_set(): - return + yield ChatEvent(type=ChatEventType.DONE) + return🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@codeframe/core/adapters/streaming_chat.py` around lines 378 - 381, The generator loop at "async for sdk_event in stream" returns immediately when "if interrupt_event and interrupt_event.is_set()" is true, causing no terminal event to be emitted; modify that branch to emit/return a terminal DONE event (the same kind used elsewhere for end-of-turn) before exiting so interrupted responses still produce a clear end-of-turn signal—update the interrupt branch in the generator around async for sdk_event in stream to send the standard DONE event (using the same event type/constructor the codebase uses for normal completion) and then return.
427-432:⚠️ Potential issue | 🔴 CriticalInsert the assistant
tool_usemessage before the follow-uptool_resultmessage.The retry request appends only the user-side
tool_resultblocks. Anthropic's tool-call flow requires the assistant message containing thetool_useblocks to appear immediately before that user message, so real tool turns can 400 even though the mocked test passes. (platform.claude.com)Also applies to: 477-480
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@codeframe/core/adapters/streaming_chat.py` around lines 427 - 432, The code rebuilds tool inputs from final_msg.content and then appends only the user-side tool_result, but Anthropic requires the assistant-side tool_use message to precede any user tool_result; update the logic in streaming_chat.py around stream.get_final_message()/final_msg handling so that when pending_tool_calls exists and final_msg has content you first synthesize and insert an assistant "tool_use" message corresponding to the pending_tool_calls (using the same structure used for real tool turns) immediately before appending the user "tool_result" message, then call _rebuild_tool_inputs(final_msg.content, pending_tool_calls) to reconstruct inputs; ensure the same fix is applied to the analogous block handling final messages later (the block around the other occurrence near the second _rebuild_tool_inputs call).
199-200:⚠️ Potential issue | 🟠 MajorLoad the newest history window, not the repo default.
InteractiveSessionRepository.get_messages()defaults to the oldest 100 rows (ORDER BY created_at LIMIT 100), so once a session grows past 100 messages this adapter reconstructs stale context before_truncate_history()ever runs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@codeframe/core/adapters/streaming_chat.py` around lines 199 - 200, The adapter is calling InteractiveSessionRepository.get_messages(self._session_id) which returns the oldest 100 rows by default, causing stale context when sessions exceed that size; change the call to fetch the newest history window instead (e.g. pass explicit params to get_messages to ORDER BY created_at DESC and LIMIT to the desired window size or call a repository method that returns the latest N messages) so that rows = self._db_repo.get_messages(self._session_id, order="desc", limit=<history_window>) (or equivalent) is used before reconstructing context and before _truncate_history() runs.
241-260:⚠️ Potential issue | 🟠 MajorSplit persistence into “user before stream” and “assistant after completion.”
The user row is only written after
_stream_turn()returns. That drops prompts on errors or cancellations, and it also lets interrupted turns persist partial assistant text as if the response finished.Also applies to: 342-345
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@codeframe/core/adapters/streaming_chat.py` around lines 241 - 260, The current _persist_turn writes both user and assistant rows only after _stream_turn completes, which loses user prompts on errors/cancels and can record partial assistant text as finished; fix by splitting persistence so the user message is persisted immediately before streaming begins (call self._db_repo.add_message with role="user" and content=user_content on the main flow before invoking _stream_turn) and keep only the assistant persistence for completed responses in _persist_turn (or an _persist_assistant method) so that assistant text is written only after successful completion (and handle cancellations/errors to avoid writing partial assistant content); update references to _persist_turn, _stream_turn, and add_message accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@codeframe/core/adapters/streaming_chat.py`:
- Around line 477-480: The history budget is only enforced once via
_truncate_history() before the first API call, so after appending
tool_result_blocks to current_messages you must re-apply truncation to avoid
exceeding the 180k budget on the next messages.stream() call; update the retry
path in the streaming logic to call _truncate_history(current_messages or the
equivalent message list) immediately after appending
{"role":"user","content":tool_result_blocks} (and before invoking
messages.stream() or any retry of send/stream), ensuring you pass the same
parameters used originally (e.g., model/context limits) so the truncated message
list is safe for subsequent calls.
---
Duplicate comments:
In `@codeframe/core/adapters/streaming_chat.py`:
- Around line 378-381: The generator loop at "async for sdk_event in stream"
returns immediately when "if interrupt_event and interrupt_event.is_set()" is
true, causing no terminal event to be emitted; modify that branch to emit/return
a terminal DONE event (the same kind used elsewhere for end-of-turn) before
exiting so interrupted responses still produce a clear end-of-turn signal—update
the interrupt branch in the generator around async for sdk_event in stream to
send the standard DONE event (using the same event type/constructor the codebase
uses for normal completion) and then return.
- Around line 427-432: The code rebuilds tool inputs from final_msg.content and
then appends only the user-side tool_result, but Anthropic requires the
assistant-side tool_use message to precede any user tool_result; update the
logic in streaming_chat.py around stream.get_final_message()/final_msg handling
so that when pending_tool_calls exists and final_msg has content you first
synthesize and insert an assistant "tool_use" message corresponding to the
pending_tool_calls (using the same structure used for real tool turns)
immediately before appending the user "tool_result" message, then call
_rebuild_tool_inputs(final_msg.content, pending_tool_calls) to reconstruct
inputs; ensure the same fix is applied to the analogous block handling final
messages later (the block around the other occurrence near the second
_rebuild_tool_inputs call).
- Around line 199-200: The adapter is calling
InteractiveSessionRepository.get_messages(self._session_id) which returns the
oldest 100 rows by default, causing stale context when sessions exceed that
size; change the call to fetch the newest history window instead (e.g. pass
explicit params to get_messages to ORDER BY created_at DESC and LIMIT to the
desired window size or call a repository method that returns the latest N
messages) so that rows = self._db_repo.get_messages(self._session_id,
order="desc", limit=<history_window>) (or equivalent) is used before
reconstructing context and before _truncate_history() runs.
- Around line 241-260: The current _persist_turn writes both user and assistant
rows only after _stream_turn completes, which loses user prompts on
errors/cancels and can record partial assistant text as finished; fix by
splitting persistence so the user message is persisted immediately before
streaming begins (call self._db_repo.add_message with role="user" and
content=user_content on the main flow before invoking _stream_turn) and keep
only the assistant persistence for completed responses in _persist_turn (or an
_persist_assistant method) so that assistant text is written only after
successful completion (and handle cancellations/errors to avoid writing partial
assistant content); update references to _persist_turn, _stream_turn, and
add_message accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d0293c5b-a769-4481-a100-8cca2625cdac
📒 Files selected for processing (3)
codeframe/core/adapters/streaming_chat.pycodeframe/ui/routers/session_chat_ws.pytests/core/test_streaming_chat.py
🚧 Files skipped from review as they are similar to previous changes (1)
- codeframe/ui/routers/session_chat_ws.py
| # Append the tool results as a user message for the next turn | ||
| current_messages = current_messages + [ | ||
| {"role": "user", "content": tool_result_blocks} | ||
| ] |
There was a problem hiding this comment.
Re-apply the history budget before each tool-driven retry.
_truncate_history() only runs before the first API call. After appending tool_result blocks here, a large read_file or search_codebase result can push the very next messages.stream() call past the 180k budget.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@codeframe/core/adapters/streaming_chat.py` around lines 477 - 480, The
history budget is only enforced once via _truncate_history() before the first
API call, so after appending tool_result_blocks to current_messages you must
re-apply truncation to avoid exceeding the 180k budget on the next
messages.stream() call; update the retry path in the streaming logic to call
_truncate_history(current_messages or the equivalent message list) immediately
after appending {"role":"user","content":tool_result_blocks} (and before
invoking messages.stream() or any retry of send/stream), ensuring you pass the
same parameters used originally (e.g., model/context limits) so the truncated
message list is safe for subsequent calls.
Review: StreamingChatAdapterGood work replacing the echo stub with a real adapter. The event model, safe tool set, interrupt support, and test structure are all solid. A few issues worth addressing before merge. Bug: Missing assistant message in tool loop In _stream_turn, when pending tool calls are executed the code builds the next turn by appending only the user message with tool_result blocks, but never first appends the assistant message containing the tool_use blocks. The Anthropic API requires alternating assistant (with tool_use) then user (with tool_result). Sending only the tool_result user message will cause API errors or context corruption on any real tool loop. The final message from stream.get_final_message() already has the full assistant content; it just needs to be added to current_messages before the tool_result turn. Bug: DONE event not emitted after ERROR In send_message, when _stream_turn raises, an ERROR event is yielded and the generator returns with no DONE following. Similarly in _run_streaming_adapter, exceptions before iteration starts send an error dict but no done dict. The relay task almost certainly loops until it sees done, so without it the WebSocket relay will hang. A DONE event should always be the final event, even on error. API confusion: empty list vs. no history The check 'if not history' treats [] (valid empty history) the same as None (no history supplied). There is no way to skip DB loading for a fresh session. Using None as the sentinel would make the intent explicit. Also: _load_history() is a synchronous DB call invoked from async send_message without asyncio.to_thread, which will block the event loop under load. No iteration limit in the tool loop The while True loop in _stream_turn has no max-iteration guard. A model that repeatedly returns stop_reason == tool_use would loop indefinitely and rack up API costs. A simple cap with a logged warning would prevent this. TOOL_USE_START emits empty tool_input for streamed inputs At content_block_start time, getattr(block, 'input', {}) is typically empty because input arrives via subsequent input_json_delta events (currently ignored with pass). The TOOL_USE_START event is yielded immediately with an empty dict. _rebuild_tool_inputs corrects this internally but the client already received the empty payload. Worth a comment at minimum. Security: workspace_path falls back silently to CWD When raw_workspace is None, the fallback is Path('.'), scoping all file reads to the server CWD. The warning is logged but the adapter proceeds, which could expose server files depending on deployment. Consider raising an error or emitting an error event instead. Minor send_message declares -> AsyncIterator[ChatEvent] but uses yield, making it an AsyncGenerator. The return type should be AsyncGenerator[ChatEvent, None]. The test test_yields_tool_use_start_and_result relies on the message_stop flush path (no content_block_stop event) — worth a comment that this is intentional coverage of that branch. Summary Critical: (1) missing assistant message in tool loop, (2) missing DONE-after-ERROR. Everything else is medium priority. Test coverage is good; a test that exercises two or more full tool loop turns would catch the tool loop bug directly. |
…g_adapter _run_streaming_adapter now takes db_repo and workspace_path as additional arguments. Tests that patched it with 4-arg fake adapters would silently fail (asyncio task drops TypeError) and hang waiting for queue events that never arrive. Updated all three fake adapters to accept the full 6-argument signature.
Review — round 3 (after commit e19eb48)Good progress on commit The two critical bugs flagged in my second review are still unresolved. Critical — still blocking merge1. Missing assistant message in the tool loop
# current code (line ~495)
current_messages = current_messages + [
{"role": "user", "content": tool_result_blocks}
]The Anthropic Messages API requires the assistant's current_messages = current_messages + [
{"role": "assistant", "content": final_msg.content}, # tool_use blocks
{"role": "user", "content": tool_result_blocks}, # tool_result blocks
]
2. Missing DONE after ERROR In except Exception as exc:
yield ChatEvent(type=ChatEventType.ERROR, content=str(exc))
return # ← relay task loops forever waiting for DONEFix: except Exception as exc:
yield ChatEvent(type=ChatEventType.ERROR, content=str(exc))
yield ChatEvent(type=ChatEventType.DONE)
returnMedium — should fix before merge3. Interrupt inside the stream loop returns without DONE The check at the top of if interrupt_event and interrupt_event.is_set():
yield ChatEvent(type=ChatEventType.DONE)
return4.
# In send_message:
if history is None:
history = await asyncio.to_thread(self._db_repo.get_messages, self._session_id)
history = [{"role": r["role"], "content": r["content"]} for r in history]This also resolves the 5. No iteration limit on the A model stuck in a Minor
After a tool turn, def _count(msgs):
total = 0
for m in msgs:
c = m.get("content") or ""
if isinstance(c, str):
total += len(enc.encode(c))
elif isinstance(c, list):
total += sum(len(enc.encode(b.get("content") or "")) for b in c if isinstance(b, dict))
return totalSummary
The two critical issues will cause observable failures the first time a real tool call is made and the first time the Anthropic API returns an error. Please address those before merge. |
Summary
StreamingChatAdapterincodeframe/core/adapters/streaming_chat.py— wrapsanthropic.AsyncAnthropic().messages.stream()and yields typedChatEventobjectssession_chat_ws.py(_run_streaming_adapter) with a real adapter call, wiring indb.interactive_sessionsandworkspace_pathStreamingChatAdapter,ChatEvent,ChatEventType,STREAMING_SAFE_TOOLSfromcodeframe/core/adapters/__init__.pytests/core/test_streaming_chat.pycovering all event typesWhat's implemented
Event types:
TEXT_DELTA,THINKING,TOOL_USE_START,TOOL_RESULT,COST_UPDATE,DONE,ERRORSafe read-only tool set:
read_file,list_files,search_codebase— no writes or shell execution in interactive sessionsTool loop: streams → executes tools via
asyncio.to_thread(execute_tool, ...)→ re-enters stream untilstop_reason == "end_turn"Interrupt: checks
interrupt_event.is_set()between every chunk; closes stream early when setPersistence: loads history from
InteractiveSessionRepository.get_messages(), persists user + assistant messages viaadd_message()after turn completesContext management: tiktoken-based history truncation to 180k token budget
Cost tracking:
ChatEvent.to_dict()emitscost_usd,input_tokens,output_tokensfor relay → DB accumulationTest plan
StreamingChatAdapter.send_message()yieldsTEXT_DELTAevents (not buffered)TOOL_USE_START+TOOL_RESULTTHINKINGevents emitted fromthinking_deltaSDK eventsCOST_UPDATEemitted with correct token countsinterrupt_event.set()stops stream within current turnsession_messagesafter complete turnsend_messagecallERRORevent emitted on API failure (no uncaught exception)tests/core/Closes #503
Summary by CodeRabbit
New Features
Bug Fixes
Tests