Skip to content

feat(core): StreamingChatAdapter for token-by-token Anthropic relay (#503)#512

Merged
frankbria merged 4 commits into
mainfrom
feat/503-streaming-chat-adapter
Mar 31, 2026
Merged

feat(core): StreamingChatAdapter for token-by-token Anthropic relay (#503)#512
frankbria merged 4 commits into
mainfrom
feat/503-streaming-chat-adapter

Conversation

@frankbria

@frankbria frankbria commented Mar 31, 2026

Copy link
Copy Markdown
Owner

Summary

  • Implements StreamingChatAdapter in codeframe/core/adapters/streaming_chat.py — wraps anthropic.AsyncAnthropic().messages.stream() and yields typed ChatEvent objects
  • Replaces the echo stub in session_chat_ws.py (_run_streaming_adapter) with a real adapter call, wiring in db.interactive_sessions and workspace_path
  • Exports StreamingChatAdapter, ChatEvent, ChatEventType, STREAMING_SAFE_TOOLS from codeframe/core/adapters/__init__.py
  • 15 unit tests in tests/core/test_streaming_chat.py covering all event types

What's implemented

Event types: TEXT_DELTA, THINKING, TOOL_USE_START, TOOL_RESULT, COST_UPDATE, DONE, ERROR

Safe read-only tool set: read_file, list_files, search_codebase — no writes or shell execution in interactive sessions

Tool loop: streams → executes tools via asyncio.to_thread(execute_tool, ...) → re-enters stream until stop_reason == "end_turn"

Interrupt: checks interrupt_event.is_set() between every chunk; closes stream early when set

Persistence: loads history from InteractiveSessionRepository.get_messages(), persists user + assistant messages via add_message() after turn completes

Context management: tiktoken-based history truncation to 180k token budget

Cost tracking: ChatEvent.to_dict() emits cost_usd, input_tokens, output_tokens for relay → DB accumulation

Test plan

  • StreamingChatAdapter.send_message() yields TEXT_DELTA events (not buffered)
  • Tool calls yield TOOL_USE_START + TOOL_RESULT
  • THINKING events emitted from thinking_delta SDK events
  • COST_UPDATE emitted with correct token counts
  • interrupt_event.set() stops stream within current turn
  • Messages persisted to session_messages after complete turn
  • History reconstructed from DB on each send_message call
  • ERROR event emitted on API failure (no uncaught exception)
  • All 15 tests pass; no regressions in tests/core/

Closes #503

Summary by CodeRabbit

  • New Features

    • Real-time streaming chat with typed incremental events, session-history handling, and background persistence of turns.
    • Tool invocation during chats restricted to a safe read-only set; tool outputs are injected back into the conversation.
    • WebSocket flow now forwards adapter events and surfaces adapter errors reliably.
  • Bug Fixes

    • Improved error handling, graceful persistence failures, and interruptible streaming.
  • Tests

    • Extensive tests covering streaming, tools, interrupts, cost updates, persistence, and error paths.

…503)

Implements StreamingChatAdapter in codeframe/core/adapters/streaming_chat.py:
- AsyncIterator[ChatEvent] over anthropic.AsyncAnthropic().messages.stream()
- ChatEventType enum: TEXT_DELTA, TOOL_USE_START, TOOL_RESULT, THINKING,
  COST_UPDATE, DONE, ERROR
- Read-only safe tool set: read_file, list_files, search_codebase
- Internal tool-use loop: streams → executes tools → re-enters stream
- Interrupt support via asyncio.Event checked between chunks
- History load/persist via InteractiveSessionRepository.get_messages/add_message
- tiktoken-based context truncation to 180k token budget
- Cost estimation per turn

Replaces the echo stub in session_chat_ws.py with a real adapter call.
Adds 15 unit tests covering all event types, tool calls, interrupt,
persistence, and error handling.
@coderabbitai

coderabbitai Bot commented Mar 31, 2026

Copy link
Copy Markdown
Contributor

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f40312d9-5463-4f46-877b-fa77d4bc10bf

📥 Commits

Reviewing files that changed from the base of the PR and between e19eb48 and 3d90746.

📒 Files selected for processing (1)
  • tests/api/test_session_chat_ws.py

Walkthrough

Adds a new StreamingChatAdapter that wraps Anthropic's async streaming API, emits typed ChatEvent objects (text deltas, thinking, tool lifecycle, cost, done, error), manages/truncates history, executes a safe tool subset, persists completed turns in background, and replaces the previous inline token-stub in the WebSocket router.

Changes

Cohort / File(s) Summary
Adapter exports
codeframe/core/adapters/__init__.py
Re-exported ChatEvent, ChatEventType, StreamingChatAdapter, and STREAMING_SAFE_TOOLS from the new streaming adapter module.
Streaming chat implementation
codeframe/core/adapters/streaming_chat.py
New module implementing StreamingChatAdapter, ChatEventType enum, ChatEvent dataclass, streaming via AsyncAnthropic().messages.stream(), history loading/truncation with tiktoken fallback, chunked emission (TEXT_DELTA, THINKING, tool lifecycle events, COST_UPDATE, DONE, ERROR), safe tool execution in threadpool, interrupt handling, best-effort cost estimation, and background persistence.
WebSocket integration
codeframe/ui/routers/session_chat_ws.py
Replaced the inline token-stub with StreamingChatAdapter usage in _run_streaming_adapter; updated signature to accept db_repo and workspace_path: Path, read workspace_path from session (warn/default), and forward adapter event.to_dict() into the token queue with exception-to-error-event handling.
Tests
tests/core/test_streaming_chat.py, tests/api/test_session_chat_ws.py
Added comprehensive unit tests for StreamingChatAdapter (mocked DB and Anthropic client) covering initialization, API-key/model handling, history loading/truncation, streaming events, tool-call flow, interrupt semantics, persistence, and error propagation. Adjusted WebSocket-stream tests to match updated adapter signature.

Sequence Diagram

sequenceDiagram
    participant Client as WebSocket Client
    participant Router as session_chat_ws Router
    participant Adapter as StreamingChatAdapter
    participant Anthropic as Anthropic SDK
    participant ToolExec as Tool Executor
    participant DB as Database

    Client->>Router: send_message(content)
    Router->>Router: obtain workspace_path
    Router->>Adapter: new(session_id, db_repo, workspace_path)
    Adapter->>DB: _load_history(session_id)
    DB-->>Adapter: prior messages

    Router->>Adapter: send_message(content, history)
    Adapter->>Anthropic: AsyncAnthropic().messages.stream()

    loop stream chunks
        Anthropic-->>Adapter: RawContentBlockDeltaEvent
        Adapter-->>Router: ChatEvent(TEXT_DELTA / THINKING)
        Router->>Client: websocket event.to_dict()
        alt tool use start
            Anthropic-->>Adapter: RawContentBlockStartEvent(tool_use)
            Adapter-->>Router: ChatEvent(TOOL_USE_START)
            Router->>Client: tool_use_start event

            Adapter->>ToolExec: execute safe tool (threadpool)
            ToolExec-->>Adapter: tool output
            Adapter-->>Router: ChatEvent(TOOL_RESULT)
            Router->>Client: tool_result event

            Adapter->>Anthropic: append tool result as user message (continue)
        end
    end

    Anthropic-->>Adapter: MessageStopEvent (usage)
    Adapter-->>Router: ChatEvent(COST_UPDATE)
    Adapter->>DB: _persist_turn(user_msg, assistant_text) (background)
    Adapter-->>Router: ChatEvent(DONE)
    Router->>Client: done event
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰
I nibbled tokens one by one,
Emitting deltas in the sun.
Tools stayed safe, the history slimmed,
Saved each word before it dimmed—
Hops and bytes and carrots spun! 🥕✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 27.69% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main change: adding a StreamingChatAdapter for token-by-token relay from Anthropic's API, which is the primary focus of all code changes.
Linked Issues check ✅ Passed All linked issue #503 requirements are met: StreamingChatAdapter implemented with ChatEvent/ChatEventType, safe tool set (read_file, list_files, search_codebase), interruption support, history management with tiktoken truncation, cost tracking, and comprehensive unit tests covering all acceptance criteria.
Out of Scope Changes check ✅ Passed All changes are directly related to implementing the streaming chat adapter: new adapter module, exports, WebSocket integration, and tests. No extraneous changes detected.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/503-streaming-chat-adapter

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude

claude Bot commented Mar 31, 2026

Copy link
Copy Markdown

Code Review — StreamingChatAdapter (#512)

Good implementation overall. Replacing the echo stub with a real streaming adapter is the right move, and the event-driven design with typed ChatEvent objects is clean. A few things worth addressing before merge:


🔴 Medium — History truncation can produce invalid API messages

_truncate_history drops the oldest message at a time. If the oldest message is a user turn, the next iteration can start with an assistant message at index 0 — which the Anthropic Messages API rejects (first message must have role: user).

# Current — drops messages one-by-one from the front:
while messages and _count(messages) > _MAX_HISTORY_TOKENS:
    messages = messages[1:]

Fix: always drop in pairs (user + the assistant reply that follows), or enforce that the result list starts with a user message:

while messages and _count(messages) > _MAX_HISTORY_TOKENS:
    # Drop a pair so we don't strand an assistant message at index 0
    messages = messages[2:] if len(messages) >= 2 else messages[1:]
# Ensure first message is from the user
while messages and messages[0]["role"] != "user":
    messages = messages[1:]

🔴 Medium — No system prompt in messages.stream()

_stream_turn calls the API without a system= parameter. The model has no context about what CodeFrame is, what the workspace contains, or what it's expected to help with. An interactive session without a system prompt will behave generically.

At minimum, inject a brief system= describing the read-only nature of the session and the workspace path:

async with self._client.messages.stream(
    model=self._model,
    system=(
        f"You are a CodeFrame assistant. You have read-only access to the workspace at "
        f"{self._workspace_path}. Available tools: read_file, list_files, search_codebase."
    ),
    messages=current_messages,
    ...
)

🟡 Low — Tool interactions are lost from session history

_persist_turn only saves the user's text message and the accumulated TEXT_DELTA content. Multi-step tool call/result exchanges are added to current_messages within the turn loop but never written to the DB. On the next session, the model won't know it previously read any files.

If tool context matters for continuity, persist the full current_messages tail (or a summarized tool trace) rather than just the text delta.


🟡 Low — workspace_path silently falls back to "."

In session_chat_ws.py:

workspace_path = Path(session.get("workspace_path") or ".")

If workspace_path is missing from the session, file tools will operate on the server's CWD, which is probably not the intended workspace. In a multi-tenant setup this leaks access to the wrong directory. At minimum, log a warning; ideally treat a missing workspace_path as an error condition.


🟡 Low — Hardcoded pricing table will become stale

_estimate_cost uses hardcoded per-million-token rates. These change without notice. The comment already says "approximate" — that's fine — but add the date the table was last verified so future reviewers know how stale it may be, and document that claude-sonnet-4-20250514 uses the claude-sonnet-4-5 pricing bucket when Anthropic releases variants.


🟢 Low — tiktoken dependency

The code gracefully falls back if tiktoken is absent, but if it's intended for production use it should appear in pyproject.toml / requirements*.txt. If it's genuinely optional (CI can run without it), the fallback is sufficient — just make sure the dependency intent is explicit somewhere.


✅ What works well

  • The read-only tool restriction (STREAMING_SAFE_TOOLS) with a defense-in-depth check inside _execute_tool is the right call for interactive sessions.
  • Interrupt support is well-placed (checked between every SDK chunk and before each tool execution).
  • Lazy AsyncAnthropic client initialization avoids import-time side effects.
  • _rebuild_tool_inputs from the final message is the right approach — accumulating input_json_delta fragments manually is error-prone.
  • The 15 unit tests cover the important paths (text, thinking, tool loop, interrupt, persistence, error). The mock patterns are clean and don't over-specify internals.
  • Error events yielded instead of propagated — correct design for a streaming interface.

The medium-priority issues (history truncation + missing system prompt) are likely to surface in real usage quickly; the rest are polish/hardening items.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@codeframe/core/adapters/streaming_chat.py`:
- Around line 199-200: The adapter is fetching the default oldest 100 rows from
InteractiveSessionRepository; change the call to self._db_repo.get_messages(...)
to request the newest/full history (e.g., pass parameters to get_messages to set
a larger/no limit and order by created_at DESC or a “latest_first” flag) so you
receive the most recent turns, then convert/normalize the rows into
[{"role":..., "content":...}] and apply the existing local truncation method
(e.g., _truncate_history or equivalent) to trim to the desired window. Ensure
you reference get_messages, _db_repo, _session_id and the local truncation
method when making these changes.
- Around line 313-319: Persist the user turn to the DB immediately after
building/truncating the messages and before calling _stream_turn so the user’s
message survives cancellations or streaming/API failures; specifically, after
messages = self._truncate_history(messages) create and save a user-turn record
(same shape used by your persistence layer) and update any in-memory/history
list to include that persisted user turn, then call _stream_turn; modify
_stream_turn (and any code paths that currently persist the turn after
streaming) to only persist/update the assistant content after a
successful/completed turn (so partial/cancelled streams do not overwrite or
create incomplete assistant text), and ensure the same change is applied to the
similar block around the code at the second location (the 337–341 region)
referenced in the comment; keep _load_history and _truncate_history usage
unchanged except to reflect the persisted user turn in history.
- Around line 367-370: When handling the streaming loop (async for sdk_event in
stream) with an interrupt_event, the current branch returns immediately and
never emits the terminal 'DONE' event; change it to mirror the tool-execution
interrupt handling by sending/relaying a terminal DONE event (same event
shape/name used elsewhere) before returning so the relay and client receive a
clear end-of-turn signal; locate the interrupt check that references
interrupt_event/is_set() inside the stream loop and add the DONE emit/relay
(then return) just like the code path around the tool-execution interrupt at the
later branch.
- Around line 416-421: When rebuilding tool call outputs after
stream.get_final_message() (using final_msg, pending_tool_calls and
_rebuild_tool_inputs), ensure you append an assistant message containing the
tool_use blocks before appending the user message containing the tool_result
blocks; specifically, construct and insert an assistant message reflecting the
executed tool_use (derived from pending_tool_calls or final_msg content)
immediately prior to creating the user tool_result message so the message
history preserves the required assistant tool_use → user tool_result ordering
(update the logic around where final_msg is processed and where the tool_result
user message is appended).

In `@codeframe/ui/routers/session_chat_ws.py`:
- Around line 303-312: The code currently uses
Path(session.get("workspace_path") or ".") which falls back to the server CWD;
instead, validate that session.get("workspace_path") is present and reject the
request if it's missing. Modify the handler before calling
session_chat_manager.get_interrupt_event/_run_streaming_adapter to check
session.get("workspace_path") (or a dedicated variable), and if it is None/empty
send an error response or close the websocket with a clear error (e.g.,
raise/return an HTTP/WebSocket error) so no adapter_task is created with a "."
workspace_path; keep all references to session_id, interrupt_event,
db.interactive_sessions and _run_streaming_adapter unchanged except for gating
them behind this validation.

In `@tests/core/test_streaming_chat.py`:
- Around line 14-23: The test file imports unused symbols causing lint failures:
remove the unused imports pytest_asyncio and ChatEvent from
tests/core/test_streaming_chat.py; update the import block that currently
imports ChatEvent, ChatEventType, StreamingChatAdapter to only import
ChatEventType and StreamingChatAdapter (and remove the separate pytest_asyncio
import), then run the tests/linter to confirm Ruff passes.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f5e70dd3-7a60-45ed-8196-268ef16892f6

📥 Commits

Reviewing files that changed from the base of the PR and between 8eefe6f and bdef057.

📒 Files selected for processing (4)
  • codeframe/core/adapters/__init__.py
  • codeframe/core/adapters/streaming_chat.py
  • codeframe/ui/routers/session_chat_ws.py
  • tests/core/test_streaming_chat.py

Comment on lines +199 to +200
rows = self._db_repo.get_messages(self._session_id)
return [{"role": r["role"], "content": r["content"]} for r in rows]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Load the latest history window, not the first 100 rows.

InteractiveSessionRepository.get_messages() defaults to LIMIT 100 with ORDER BY created_at, so once a session grows past 100 messages this adapter will replay the oldest context and drop the most recent turns before _truncate_history() even runs. Ask the repo for the newest/full history here, then trim locally.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/core/adapters/streaming_chat.py` around lines 199 - 200, The
adapter is fetching the default oldest 100 rows from
InteractiveSessionRepository; change the call to self._db_repo.get_messages(...)
to request the newest/full history (e.g., pass parameters to get_messages to set
a larger/no limit and order by created_at DESC or a “latest_first” flag) so you
receive the most recent turns, then convert/normalize the rows into
[{"role":..., "content":...}] and apply the existing local truncation method
(e.g., _truncate_history or equivalent) to trim to the desired window. Ensure
you reference get_messages, _db_repo, _session_id and the local truncation
method when making these changes.

Comment on lines +313 to +319
# Load history from DB if caller didn't supply it
if not history:
history = self._load_history()

# Build the message list for this turn
messages: list[dict] = list(history) + [{"role": "user", "content": content}]
messages = self._truncate_history(messages)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Persist the user turn before entering the stream.

The only DB write happens after _stream_turn() returns. _receive() in codeframe/ui/routers/session_chat_ws.py cancels any in-flight adapter before starting the next prompt, so cancelled or API-failed turns lose the user's message entirely, and interrupted turns can still write a partial assistant reply. Split this into “persist user upfront” and “persist assistant only after a completed turn.”

Also applies to: 337-341

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/core/adapters/streaming_chat.py` around lines 313 - 319, Persist
the user turn to the DB immediately after building/truncating the messages and
before calling _stream_turn so the user’s message survives cancellations or
streaming/API failures; specifically, after messages =
self._truncate_history(messages) create and save a user-turn record (same shape
used by your persistence layer) and update any in-memory/history list to include
that persisted user turn, then call _stream_turn; modify _stream_turn (and any
code paths that currently persist the turn after streaming) to only
persist/update the assistant content after a successful/completed turn (so
partial/cancelled streams do not overwrite or create incomplete assistant text),
and ensure the same change is applied to the similar block around the code at
the second location (the 337–341 region) referenced in the comment; keep
_load_history and _truncate_history usage unchanged except to reflect the
persisted user turn in history.

Comment on lines +367 to +370
async for sdk_event in stream:
# Honour interrupt between chunks
if interrupt_event and interrupt_event.is_set():
return

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Emit DONE when the interrupt trips mid-stream.

This branch returns without a terminal event, unlike the tool-execution interrupt path at Lines 441-443. The relay keeps waiting and the client never gets a clear end-of-turn signal for the interrupted response.

🔚 Proposed fix
                     # Honour interrupt between chunks
                     if interrupt_event and interrupt_event.is_set():
-                        return
+                        yield ChatEvent(type=ChatEventType.DONE)
+                        return
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async for sdk_event in stream:
# Honour interrupt between chunks
if interrupt_event and interrupt_event.is_set():
return
async for sdk_event in stream:
# Honour interrupt between chunks
if interrupt_event and interrupt_event.is_set():
yield ChatEvent(type=ChatEventType.DONE)
return
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/core/adapters/streaming_chat.py` around lines 367 - 370, When
handling the streaming loop (async for sdk_event in stream) with an
interrupt_event, the current branch returns immediately and never emits the
terminal 'DONE' event; change it to mirror the tool-execution interrupt handling
by sending/relaying a terminal DONE event (same event shape/name used elsewhere)
before returning so the relay and client receive a clear end-of-turn signal;
locate the interrupt check that references interrupt_event/is_set() inside the
stream loop and add the DONE emit/relay (then return) just like the code path
around the tool-execution interrupt at the later branch.

Comment on lines +416 to +421
final_msg = stream.get_final_message()
stop_reason = final_msg.stop_reason or "end_turn"

# Rebuild tool inputs from final message (more reliable than streaming)
if pending_tool_calls and hasattr(final_msg, "content"):
_rebuild_tool_inputs(final_msg.content, pending_tool_calls)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

In Anthropic's Messages API tool-use flow, must the follow-up request include the assistant message containing the tool_usecontent blocks immediately before the usertool_result blocks?

💡 Result:

Yes, in Anthropic's Messages API tool-use flow for client tools, the follow-up request must include the previous assistant message (containing the tool_use content blocks) immediately before the new user message with the tool_result blocks. This is required because the API is stateless, so every request includes the full conversation history, and the validation enforces that each tool_use block is followed directly by its corresponding tool_result in the subsequent user message without intervening messages. Official documentation states: "Tool result blocks must immediately follow their corresponding tool use blocks in the message history. You cannot include any messages between the assistant's tool use message and the user's tool result message." API errors explicitly confirm this: "tool_use ids were found without tool_result blocks immediately after... Each tool_use block must have a corresponding tool_result block in the next [message]."

Citations:


🏁 Script executed:

# First, find and read the streaming_chat.py file
fd streaming_chat.py

Repository: frankbria/codeframe

Length of output: 138


🏁 Script executed:

# Read the file to verify the code at lines 416-421 and 466-469
cat -n codeframe/core/adapters/streaming_chat.py | sed -n '400,480p'

Repository: frankbria/codeframe

Length of output: 4150


🏁 Script executed:

# Let me look at more context around the code to understand the data flow better
# Check what happens with the assistant message and tool_use blocks
cat -n codeframe/core/adapters/streaming_chat.py | sed -n '350,400p'

Repository: frankbria/codeframe

Length of output: 2734


🏁 Script executed:

# Check what final_msg contains and how it's structured
# Look for how final_msg is used elsewhere in the file
rg -A 5 "final_msg\." codeframe/core/adapters/streaming_chat.py | head -50

Repository: frankbria/codeframe

Length of output: 997


🏁 Script executed:

# Let's verify what fields are available on final_msg by checking the Message type
# Also check if there's an existing assistant message being built elsewhere
rg "role.*assistant" codeframe/core/adapters/streaming_chat.py

Repository: frankbria/codeframe

Length of output: 92


🏁 Script executed:

# Find the context where role="assistant" appears
rg -B 5 -A 5 'role="assistant"' codeframe/core/adapters/streaming_chat.py

Repository: frankbria/codeframe

Length of output: 393


🏁 Script executed:

# Check the Anthropic SDK to understand Message structure
# Look for imports and how messages are used
rg "from anthropic" codeframe/core/adapters/streaming_chat.py

Repository: frankbria/codeframe

Length of output: 111


🏁 Script executed:

# Let's check what the Message API returns and confirm final_msg has content with tool_use blocks
# Also check if there's any test that shows how messages should be structured
cat -n tests/core/test_streaming_chat.py | head -100

Repository: frankbria/codeframe

Length of output: 3463


🏁 Script executed:

# Continue reading the test to see how messages are used and what the expected format is
cat -n tests/core/test_streaming_chat.py | sed -n '100,200p'

Repository: frankbria/codeframe

Length of output: 4222


🏁 Script executed:

# Look for tests that involve tool calls and message structure
cat -n tests/core/test_streaming_chat.py | sed -n '200,350p'

Repository: frankbria/codeframe

Length of output: 6376


🏁 Script executed:

# Continue reading the tool call test
cat -n tests/core/test_streaming_chat.py | sed -n '350,450p'

Repository: frankbria/codeframe

Length of output: 4547


🏁 Script executed:

# Continue reading the persistence test to understand message appending
cat -n tests/core/test_streaming_chat.py | sed -n '450,550p'

Repository: frankbria/codeframe

Length of output: 2023


Append the assistant tool_use message before the tool_result user message.

When the API returns tool_use blocks and you execute them, the follow-up request must include both the assistant message (containing the tool_use blocks) and the user message (containing the tool_result blocks). Currently, only the tool_result user message is appended at lines 467–469. The next API request will have ... tool_result with no preceding tool_use block, violating Anthropic's requirement that "tool_use blocks must have corresponding tool_result blocks immediately after" in the message history.

🔁 Proposed fix
         while True:
             # Track tool calls seen in this API turn for the follow-up message
             pending_tool_calls: list[dict] = []  # {id, name, input, result}
             active_tool: dict | None = None  # buffering the current tool_use block
             stop_reason = "end_turn"
+            assistant_content = None
@@
                     elif event_type == "message_stop":
@@
                         final_msg = stream.get_final_message()
                         stop_reason = final_msg.stop_reason or "end_turn"
+                        assistant_content = getattr(final_msg, "content", None)
@@
             # Append the tool results as a user message for the next turn
             current_messages = current_messages + [
-                {"role": "user", "content": tool_result_blocks}
+                {"role": "assistant", "content": assistant_content or []},
+                {"role": "user", "content": tool_result_blocks},
             ]

Also applies to: 466–469

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/core/adapters/streaming_chat.py` around lines 416 - 421, When
rebuilding tool call outputs after stream.get_final_message() (using final_msg,
pending_tool_calls and _rebuild_tool_inputs), ensure you append an assistant
message containing the tool_use blocks before appending the user message
containing the tool_result blocks; specifically, construct and insert an
assistant message reflecting the executed tool_use (derived from
pending_tool_calls or final_msg content) immediately prior to creating the user
tool_result message so the message history preserves the required assistant
tool_use → user tool_result ordering (update the logic around where final_msg is
processed and where the tool_result user message is appended).

Comment on lines 303 to +312
interrupt_event = await session_chat_manager.get_interrupt_event(session_id)
workspace_path = Path(session.get("workspace_path") or ".")
adapter_task[0] = asyncio.create_task(
_run_streaming_adapter(session_id, content, token_queue, interrupt_event)
_run_streaming_adapter(
session_id,
content,
token_queue,
interrupt_event,
db.interactive_sessions,
workspace_path,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don’t fall back to the server CWD for workspace-scoped tools.

If session["workspace_path"] is missing, Path(... or ".") scopes read_file, list_files, and search_codebase to whatever directory the process started in. That breaks workspace isolation and can expose the wrong tree. Reject the request unless a concrete workspace path is present instead of silently defaulting to ".". As per coding guidelines, "All API endpoints must require workspace_path query parameter".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/ui/routers/session_chat_ws.py` around lines 303 - 312, The code
currently uses Path(session.get("workspace_path") or ".") which falls back to
the server CWD; instead, validate that session.get("workspace_path") is present
and reject the request if it's missing. Modify the handler before calling
session_chat_manager.get_interrupt_event/_run_streaming_adapter to check
session.get("workspace_path") (or a dedicated variable), and if it is None/empty
send an error response or close the websocket with a clear error (e.g.,
raise/return an HTTP/WebSocket error) so no adapter_task is created with a "."
workspace_path; keep all references to session_id, interrupt_event,
db.interactive_sessions and _run_streaming_adapter unchanged except for gating
them behind this validation.

Comment thread tests/core/test_streaming_chat.py
- _truncate_history: drop in pairs to avoid stranding assistant message
  at index 0 (Anthropic API rejects messages not starting with user role)
- _stream_turn: add system= prompt describing CodeFrame context and
  read-only tool constraints
- session_chat_ws: log warning when workspace_path is missing from session
- _estimate_cost: add last-verified date to pricing table comment
- tests: add TestTruncateHistory covering pair-drop and user-first invariant
@claude

claude Bot commented Mar 31, 2026

Copy link
Copy Markdown

Review: StreamingChatAdapter (503)

@claude

claude Bot commented Mar 31, 2026

Copy link
Copy Markdown

Review: StreamingChatAdapter (503)

Solid implementation replacing the echo stub with real Anthropic streaming. Clean architecture, good safe-tools allowlist, comprehensive test coverage. Issues to address before merge:

BUG: Missing DONE event on interrupt. When interrupt_event fires inside _stream_turn loop, method returns without yielding DONE. The old stub sent type:done on interrupt. If the WS consumer waits for that signal to close, this path will hang. Fix: yield DONE before each early return, or push it onto the queue unconditionally after the async-for loop in _run_streaming_adapter.

ISSUE: Blocking IO in async context. _load_history() calls db_repo.get_messages() synchronously inside send_message, blocking the event loop. Wrap with asyncio.to_thread to match the pattern in _persist_turn.

ISSUE: Private symbol imports. _READ_FILE_SCHEMA, _LIST_FILES_SCHEMA, _SEARCH_CODEBASE_SCHEMA are _-prefixed private details of tools.py. Export them publicly or move to a shared schemas module.

ISSUE: input_json_delta silently discarded. Handler just passes. Tool inputs reconstructed post-hoc via _rebuild_tool_inputs which works, but the silent discard is non-obvious. Add a comment explaining the rationale.

Minor: _truncate_history is O(n^2) due to repeated list slicing. Missing type annotations on send_message and init. Hardcoded pricing in _estimate_cost will drift - add a comment pointing to Anthropic pricing docs.

Looks good: STREAMING_SAFE_TOOLS read-only allowlist is the right security boundary. asyncio.to_thread usage for _persist_turn and _execute_tool is correct. Tool-use loop interrupt checks are in the right place. 15 tests cover all event types cleanly. core/adapters/ placement keeps core headless.

@claude

claude Bot commented Mar 31, 2026

Copy link
Copy Markdown

CodeFRAME Development Guidelines

Last updated: 2026-03-09

Product Vision

CodeFrame is a project delivery system: Think → Build → Prove → Ship.

It owns the edges of the AI coding pipeline — everything BEFORE code gets written (PRD, specification, task decomposition) and everything AFTER (verification gates, quality memory, deployment). The actual code writing is delegated to frontier coding agents (Claude Code, Codex, OpenCode) that are better at it than any custom agent.

CodeFrame does not compete with coding agents. It orchestrates them.

THINK:  cf prd generate → cf prd stress-test → cf tasks generate
BUILD:  cf work start --engine claude-code  (or codex, opencode, built-in)
PROVE:  cf proof run  (9-gate evidence-based quality system)
SHIP:   cf pr create → cf pr merge
LOOP:   Glitch → cf proof capture → New REQ → Enforced forever

Status: Phase 1 ✅ | Phase 2 ✅ | Phase 2.5 ✅ — CLI workflow, server layer, and ReAct agent complete. Agent adapter architecture (#408) and PROOF9 quality system (#422) are next priorities. See docs/V2_STRATEGIC_ROADMAP.md for the full plan.

If you are an agent working in this repo: do not improvise architecture. Follow the documents listed below.


Primary Contract (MUST FOLLOW)

  1. Golden Path: docs/GOLDEN_PATH.md
    The only workflow we build until it works end-to-end.

  2. Refactor Plan: docs/REFACTOR_PLAN_FOR_AGENT.md
    Step-by-step refactor instructions.

  3. Command Tree + Module Mapping: docs/CLI_WIREFRAME.md
    The authoritative map from CLI commands → core modules/functions.

  4. Agent Implementation: docs/AGENT_IMPLEMENTATION_TASKS.md
    Tracks the agent system components (all complete).

  5. Strategic Roadmap: docs/V2_STRATEGIC_ROADMAP.md
    5-phase plan from CLI to multi-agent.

Rule 0: If a change does not directly support the Think → Build → Prove → Ship pipeline, do not implement it.

Strategic Priority (Phase 4)

The next major architectural work is the Agent Adapter Architecture (#408):


Current Reality (Phase 1, 2 & 2.5 Complete)

What's Working Now

  • Full agent execution: cf work start <task-id> --execute (uses ReAct engine by default)
  • Engine selection: --engine react (default) or --engine plan (legacy)
  • Verbose mode: cf work start <task-id> --execute --verbose shows detailed progress
  • Dry run mode: cf work start <task-id> --execute --dry-run
  • Self-correction loop: Agent automatically fixes failing verification gates (up to 5 attempts with ReAct)
  • FAILED task status: Tasks can transition to FAILED for proper error visibility
  • Tech stack configuration: cf init . --detect auto-detects tech stack from project files
  • Project preferences: Agent loads AGENTS.md or CLAUDE.md for per-project configuration
  • Stall detection: Thread-based monitor with configurable recovery (--stall-action blocker|retry|fail)
  • Blocker detection: Agent creates blockers when stuck
  • Verification gates: Ruff/pytest checks after file changes
  • State persistence: Pause/resume across sessions
  • Batch execution: cf work batch run with serial/parallel/auto strategies
  • Task dependencies: depends_on field with dependency graph analysis
  • LLM dependency inference: --strategy auto analyzes task descriptions
  • Automatic retry: --retry N for failed task recovery
  • Batch resume: Re-run failed/blocked tasks from previous batches
  • Task scheduling: cf schedule show/predict/bottlenecks with CPM-based scheduling
  • Task templates: cf templates list/show/apply with 7 builtin templates
  • Effort estimation: Tasks support estimated_hours field for scheduling
  • Environment validation: cf env check/install/doctor validates tools and dependencies
  • GitHub PR workflow: cf pr create/status/checks/merge for PR management
  • Task self-diagnosis: cf work diagnose <task-id> analyzes failed tasks
  • 70+ integration tests: Comprehensive CLI test coverage
  • REST API: Full v2 API with 16 router modules (see Phase 2 below)
  • API authentication: API key auth with scopes (read/write/admin)
  • Rate limiting: Configurable per-endpoint rate limits
  • Real-time streaming: SSE for task execution events
  • OpenAPI documentation: Full Swagger/ReDoc at /docs and /redoc

v2 Architecture (current)

  • Core-first: Domain logic lives in codeframe/core/ (headless, no FastAPI imports)
  • CLI-first: Golden Path works without any running FastAPI server
  • Adapters: LLM providers in codeframe/adapters/llm/
  • Server/UI optional: FastAPI and UI are thin adapters over core

v1 Legacy

  • FastAPI server + WebSockets + React/Next.js dashboard retained for reference
  • Do not build toward v1 patterns during Golden Path work

Repository Structure

codeframe/
├── core/                    # Headless domain + orchestration (NO FastAPI imports)
│   ├── react_agent.py      # ReAct agent (default engine) - observe-think-act loop
│   ├── tools.py            # Tool definitions for ReAct agent (7 tools)
│   ├── editor.py           # Search-replace file editor with fuzzy matching
│   ├── agent.py            # Legacy plan-based agent (--engine plan)
│   ├── planner.py          # LLM-powered implementation planning (plan engine)
│   ├── executor.py         # Code execution engine with rollback (plan engine)
│   ├── context.py          # Task context loader with relevance scoring
│   ├── tasks.py            # Task management with depends_on field
│   ├── blockers.py         # Human-in-the-loop blocker system
│   ├── runtime.py          # Run lifecycle management
│   ├── conductor.py        # Batch orchestration with worker pool
│   ├── dependency_graph.py # DAG operations and execution planning
│   ├── dependency_analyzer.py # LLM-based dependency inference
│   ├── gates.py            # Verification gates (ruff, pytest, BUILD)
│   ├── fix_tracker.py      # Fix attempt tracking for loop prevention
│   ├── quick_fixes.py      # Pattern-based fixes without LLM
│   ├── agents_config.py    # AGENTS.md/CLAUDE.md preference loading
│   ├── workspace.py        # Workspace initialization
│   ├── prd.py              # PRD management
│   ├── events.py           # Event emission
│   ├── state_machine.py    # Task status transitions
│   ├── environment.py      # Environment validation and tool detection
│   ├── installer.py        # Automatic tool installation
│   ├── diagnostics.py      # Failed task analysis
│   ├── diagnostic_agent.py # AI-powered task diagnosis
│   ├── credentials.py      # API key and credential management
│   ├── stall_detector.py   # Synchronous stall detector + StallAction enum + StallDetectedError
│   ├── stall_monitor.py    # Thread-based stall watchdog with callback
│   ├── streaming.py        # Real-time output streaming for cf work follow
│   └── ...
├── adapters/
│   └── llm/                # LLM provider adapters
│       ├── base.py         # Protocol + ModelSelector + Purpose enum
│       ├── anthropic.py    # Anthropic Claude provider
│       └── mock.py         # Mock provider for testing
├── cli/
│   └── app.py              # Typer CLI entry + subcommands
├── ui/                     # FastAPI server (Phase 2 - thin adapter over core)
│   ├── server.py           # FastAPI app with OpenAPI configuration
│   ├── models.py           # Pydantic request/response models
│   ├── dependencies.py     # Shared dependencies (workspace, auth)
│   └── routers/            # API route handlers
│       ├── blockers_v2.py  # Blocker CRUD
│       ├── tasks_v2.py     # Task management + streaming
│       ├── prd_v2.py       # PRD management + versioning
│       ├── workspace_v2.py # Workspace init and status
│       ├── batches_v2.py   # Batch execution
│       ├── streaming_v2.py # SSE event streaming
│       ├── api_key_v2.py   # API key management
│       └── ...             # 16 router modules total
├── lib/                    # Shared utilities
│   ├── rate_limiter.py     # SlowAPI rate limiting
│   └── audit_logger.py     # Request audit logging
├── auth/                   # Authentication
│   ├── api_key_service.py  # API key creation/validation
│   └── dependencies.py     # Auth dependencies
├── config/
│   └── rate_limits.py      # Rate limit configuration
└── server/                 # Legacy server code (reference only)

web-ui/                     # Frontend (legacy, reference only)
tests/
├── core/                   # Core module tests
│   ├── test_agent.py
│   ├── test_executor.py
│   ├── test_planner.py
│   ├── test_context.py
│   ├── test_conductor.py
│   ├── test_dependency_graph.py
│   ├── test_dependency_analyzer.py
│   ├── test_task_dependencies.py
│   └── ...
└── adapters/
    └── test_llm.py

Architecture Rules (non-negotiable)

1) Core must be headless

codeframe/core/** must NOT import:

  • FastAPI
  • WebSocket frameworks
  • HTTP request/response objects
  • UI modules

Core is allowed to:

  • read/write durable state (SQLite/filesystem)
  • run orchestration/worker loops
  • emit events to an append-only event log
  • call adapters via interfaces (LLM, git, fs)

2) CLI must not require a server

Golden Path commands must work from the CLI with no server running.

FastAPI is optional and must be started explicitly (e.g., codeframe serve) and must wrap core.

3) Agent state transitions flow through runtime

Critical pattern discovered during implementation:

  • Agent (agent.py) manages its own AgentState (IDLE, PLANNING, EXECUTING, BLOCKED, COMPLETED, FAILED)
  • Runtime (runtime.py) handles all TaskStatus transitions (BACKLOG, READY, IN_PROGRESS, DONE, BLOCKED)
  • Agent does NOT call tasks.update_status() - runtime does this based on agent state

This separation prevents duplicate state transitions (e.g., DONE→DONE, BLOCKED→BLOCKED errors).

4) Legacy can be read, not depended on

Legacy code is reference material.

  • Copy/simplify logic into core when useful
  • Do NOT import legacy UI/server modules into core
  • Do NOT "fix the UI" during Golden Path work

5) Keep commits runnable

At all times:

  • codeframe --help works
  • Golden Path command stubs can run
  • Avoid breaking the repo with large renames/moves

Agent System Architecture

Components

Component File Purpose
ReactAgent core/react_agent.py Default engine: observe-think-act loop with tool use
Tools core/tools.py 7 agent tools: read/edit/create file, run command/tests, search, list
Editor core/editor.py Search-replace editor with 4-level fuzzy matching
Stall Detector core/stall_detector.py Synchronous stall check + StallAction enum + StallDetectedError
Stall Monitor core/stall_monitor.py Thread-based watchdog with callback (integrated into ReactAgent)
LLM Adapter adapters/llm/base.py Protocol, ModelSelector, Purpose enum
Anthropic Provider adapters/llm/anthropic.py Claude integration with streaming
Mock Provider adapters/llm/mock.py Testing with call tracking
Context Loader core/context.py Codebase scanning, relevance scoring
Planner core/planner.py Task → ImplementationPlan via LLM (plan engine)
Executor core/executor.py File ops, shell commands, rollback (plan engine)
Agent (legacy) core/agent.py Plan-based orchestration (--engine plan)
Runtime core/runtime.py Run lifecycle, engine selection, agent invocation
Conductor core/conductor.py Batch orchestration, worker pool
Dependency Graph core/dependency_graph.py DAG operations, topological sort
Dependency Analyzer core/dependency_analyzer.py LLM-based dependency inference
Environment Validator core/environment.py Tool detection and validation
Installer core/installer.py Automatic tool installation
Diagnostics core/diagnostics.py Failed task analysis
Diagnostic Agent core/diagnostic_agent.py AI-powered task diagnosis
Credentials core/credentials.py API key and credential management
Event Publisher core/streaming.py Real-time SSE event distribution
API Key Service auth/api_key_service.py API key CRUD and validation
Rate Limiter lib/rate_limiter.py Per-endpoint rate limiting

Model Selection Strategy

Task-based heuristic via Purpose enum:

  • PLANNING → claude-sonnet-4-20250514 (complex reasoning)
  • EXECUTION → claude-sonnet-4-20250514 (balanced)
  • GENERATION → claude-haiku-4-20250514 (fast/cheap)

Future: cf tasks set provider <id> <provider> for per-task override.

Engine Selection

CodeFRAME supports two execution engines, selected via --engine:

Engine Flag Pattern Best For
ReAct (default) --engine react Observe → Think → Act loop Most tasks, adaptive execution
Plan (legacy) --engine plan Plan all steps → Execute sequentially Well-defined, predictable tasks

Execution Flow (ReAct — default)

cf work start <id> --execute [--verbose]
    │
    ├── runtime.start_task_run()      # Creates run, transitions task→IN_PROGRESS
    │
    └── runtime.execute_agent(engine="react")
            │
            └── ReactAgent.run(task_id)
                ├── Load context (PRD, codebase, blockers, AGENTS.md, tech_stack)
                ├── Build layered system prompt
                │
                └── Tool-use loop (until complete/blocked/failed):
                    ├── Check stall detector (configurable: retry/blocker/fail)
                    ├── LLM decides next action (tool call)
                    ├── Execute tool: read_file, edit_file, create_file,
                    │   run_command, run_tests, search_codebase, list_files
                    ├── Observe result → feed back to LLM
                    ├── Record activity (resets stall timer)
                    ├── Incremental verification (ruff after file changes)
                    └── Token budget management (3-tier compaction)
                │
                └── Final verification with self-correction (up to 5 retries)
                │
                └── Update run/task status based on agent result
                    ├── COMPLETED → complete_run() → task→DONE
                    ├── BLOCKED → block_run() → task→BLOCKED
                    └── FAILED → fail_run() → task→FAILED

Execution Flow (Plan — legacy, --engine plan)

cf work start <id> --execute --engine plan
    │
    ├── runtime.start_task_run()
    │
    └── runtime.execute_agent(engine="plan")
            │
            ├── agent.run(task_id)
            │   ├── Load context (PRD, codebase, blockers, AGENTS.md)
            │   ├── Create plan via LLM
            │   ├── Execute steps (file create/edit, shell commands)
            │   ├── Run incremental verification (ruff)
            │   ├── Detect blockers (consecutive failures, missing files)
            │   └── Run final verification with SELF-CORRECTION LOOP:
            │       ├── Run all gates (pytest, ruff)
            │       ├── If failed: _attempt_verification_fix()
            │       │   ├── Try ruff --fix for quick lint fixes
            │       │   ├── Use LLM to generate fix plan from errors
            │       │   └── Execute fix steps
            │       └── Retry up to max_attempts (default: 3)
            │
            └── Update run/task status based on agent result
                ├── COMPLETED → complete_run() → task→DONE
                ├── BLOCKED → block_run() → task→BLOCKED
                └── FAILED → fail_run() → task→FAILED

Commands (v2 CLI)

Python (preferred)

Use uv for Python tasks:

uv run pytest
uv run pytest tests/core/  # Core module tests only
uv run ruff check .

CLI (Golden Path)

# Workspace
cf init <repo>                                    # Initialize workspace
cf init <repo> --detect                           # Initialize + auto-detect tech stack
cf init <repo> --tech-stack "Python with uv"      # Initialize + explicit tech stack
cf init <repo> --tech-stack-interactive           # Initialize + interactive setup
cf status

# PRD
cf prd add <file.md>
cf prd show

# Tasks
cf tasks generate          # Uses LLM to generate from PRD
cf tasks list
cf tasks list --status READY
cf tasks show <id>

# Work execution (single task)
cf work start <task-id>                    # Creates run record
cf work start <task-id> --execute          # Runs AI agent (ReAct engine, default)
cf work start <task-id> --execute --engine plan  # Use legacy plan engine
cf work start <task-id> --execute --verbose  # With detailed output
cf work start <task-id> --execute --dry-run  # Preview changes
cf work start <task-id> --execute --stall-timeout 120  # Custom stall timeout (0=disabled)
cf work start <task-id> --execute --stall-action retry  # Recovery: blocker|retry|fail
cf work stop <task-id>                     # Cancel stale run
cf work resume <task-id>                   # Resume blocked work
cf work follow <task-id>                   # Stream real-time output
cf work follow <task-id> --tail 50         # Show last 50 lines then stream

# Batch execution (multiple tasks)
cf work batch run <id1> <id2> ...          # Execute multiple tasks (ReAct default)
cf work batch run --all-ready              # All READY tasks
cf work batch run --all-ready --engine plan  # Use legacy plan engine
cf work batch run --strategy serial        # Serial (default)
cf work batch run --strategy parallel      # Parallel execution
cf work batch run --strategy auto          # LLM-inferred dependencies
cf work batch run --max-parallel 4         # Concurrent limit
cf work batch run --retry 3               # Auto-retry failures
cf work batch status [batch_id]            # Show batch status
cf work batch cancel <batch_id>            # Cancel running batch
cf work batch resume <batch_id>            # Re-run failed tasks

# Blockers
cf blocker list
cf blocker show <id>
cf blocker answer <id> "answer"

# Quality
cf review
cf patch export
cf commit

# State
cf checkpoint create "name"
cf checkpoint list
cf checkpoint restore <id>
cf summary

# Environment validation
cf env check                     # Validate tools and dependencies
cf env install                   # Install missing tools
cf env doctor                    # Comprehensive environment health check

# GitHub PR workflow
cf pr create                     # Create PR from current branch
cf pr status                     # Show PR status
cf pr checks                     # Show CI check results
cf pr merge                      # Merge approved PR

# Diagnostics
cf work diagnose <task-id>       # AI-powered analysis of failed tasks

Note: codeframe serve exists but Golden Path does not depend on it.

Frontend (legacy)

cd web-ui && npm test
cd web-ui && npm run build

Do not expand frontend scope during Golden Path work.


Documentation Navigation

Authoritative (v2)

  • docs/GOLDEN_PATH.md - CLI-first workflow contract
  • docs/REFACTOR_PLAN_FOR_AGENT.md - Step-by-step refactor instructions
  • docs/CLI_WIREFRAME.md - Command → module mapping
  • docs/AGENT_IMPLEMENTATION_TASKS.md - Agent system components
  • docs/V2_STRATEGIC_ROADMAP.md - 5-phase plan from CLI to multi-agent

Agent Architecture (Phase 2.5)

  • docs/AGENT_V3_UNIFIED_PLAN.md - ReAct architecture design and rules
  • docs/REACT_AGENT_ARCHITECTURE.md - Deep-dive: tools, editor, token management
  • docs/REACT_AGENT_ANALYSIS.md - Golden path test run analysis

API Documentation (Phase 2)

  • /docs - Swagger UI (interactive API explorer)
  • /redoc - ReDoc (readable API documentation)
  • /openapi.json - OpenAPI 3.1 specification
  • docs/PHASE_2_DEVELOPER_GUIDE.md - Server layer implementation guide
  • docs/PHASE_2_CLI_API_MAPPING.md - CLI to API endpoint mapping

Legacy (v1 reference only)

These describe old server/UI-driven architecture:

  • SPRINTS.md, sprints/
  • specs/
  • CODEFRAME_SPEC.md
  • v1 feature docs (context/session/auth/UI state management)

What NOT to do (common agent failure modes)

  • Don't add new HTTP endpoints to support the CLI
  • Don't require codeframe serve for CLI workflows
  • Don't implement UI concepts (tabs, panels, progress bars) inside core
  • Don't redesign auth, websockets, or UI state management
  • Don't add multi-providers/model switching features before Golden Path works
  • Don't "clean up the repo" as a goal - only refactor to enable Golden Path
  • Don't update task status from agent.py - let runtime handle transitions

Testing / Demoing CodeFRAME on Sample Projects

When running uv run cf commands against a sample project (e.g., cf-test/) to test or demo CodeFRAME's capabilities, you are observing the CodeFRAME agent's work, not doing the work yourself.

Rules for testing/demo mode:

  • You are evaluating how well the CodeFRAME agent (ReAct or Plan engine) builds the project
  • Do NOT help out, fix errors, or write code on behalf of the CodeFRAME agent
  • Do NOT intervene when the agent makes mistakes — that's data
  • Your job is to report the process: what worked, what failed, how close the agent got
  • Document the agent's output, errors encountered, and final state
  • Assess completion against the PRD/acceptance criteria objectively
  • If the agent gets stuck or fails, report that as a finding — don't rescue it

This applies when using commands like cf work start <id> --execute, cf work batch run, or any command that triggers the AI agent to do implementation work on a target project.


Practical Working Mode for Agents

When implementing anything, do this loop:

  1. Read docs/GOLDEN_PATH.md and confirm the change is required
  2. Find the command in docs/CLI_WIREFRAME.md
  3. Implement core functionality in codeframe/core/
  4. Call it from Typer command in codeframe/cli/
  5. Emit events + persist state
  6. Keep it runnable. Commit.

If you are unsure which direction to take, default to:

  • simpler state
  • fewer dependencies
  • smaller surface area
  • core-first, CLI-first

Recent Updates (2026-03-09)

Stall Detection System (#399, #400, #401)

Complete stall detection and configurable recovery for agent execution:

Components:

  • StallMonitor (core/stall_monitor.py) — Thread-based watchdog polling every 5s
  • StallDetector (core/stall_detector.py) — Synchronous time-tracking primitive
  • StallAction enum — Recovery strategy: RETRY, BLOCKER, FAIL
  • StallDetectedError — Exception for RETRY path (propagates to runtime for retry)

CLI flags:

  • --stall-timeout N — Seconds without tool activity before stall (default: 300, 0=disabled)
  • --stall-action {blocker,retry,fail} — Recovery action (default: blocker)
  • Both flags available on cf work start and cf work batch run

Recovery flow:

  • BLOCKER (default): Creates informative blocker, task → BLOCKED
  • RETRY: Raises StallDetectedError, runtime retries once with fresh agent
  • FAIL: Task transitions directly to FAILED

Config: agent_budget.stall_timeout_s in .codeframe/config.yaml (0 = disabled)


Phase 2.5 Complete: ReAct Agent Architecture (#355)

Default execution engine switched from plan-based to ReAct (Reasoning + Acting).

What changed:

  • Default engine is now "react" — all cf work start --execute and cf work batch run commands use ReactAgent
  • Legacy plan engine available via --engine plan flag
  • ReactAgent uses iterative tool-use loop (observe → think → act) instead of plan-all-then-execute
  • 7 structured tools: read_file, edit_file, create_file, run_command, run_tests, search_codebase, list_files
  • Search-replace editing with 4-level fuzzy matching (exact → whitespace-normalized → indentation-agnostic → fuzzy)
  • Token budget management with 3-tier compaction
  • Adaptive iteration budget based on task complexity

Phase 2.5 deliverables:

Phase Focus Pipeline Stage Status
1 CLI Completion Think + Build Complete
2 Server Layer Build (API) Complete
2.5 ReAct Agent Build (execution) Complete
3 Web UI Rebuild All (dashboard) In Progress
4 Agent Adapters + Orchestration Build (delegate to frontier agents) Next
5 PROOF9 + Advanced Prove + Ship (quality memory) Planned

Phase 2 Complete: Server Layer (2026-02-03)

Phase 2 deliverables completed:

Server Architecture (Phase 2)

Pattern: Thin adapter over core - server routes delegate to core.* modules.

CLI (typer) ─┬── core.* ─── adapters.*
             │
Server (fastapi) ─┘

V2 Router Modules (16 total):

Router Endpoints Purpose
blockers_v2 5 Blocker CRUD
prd_v2 8 PRD management + versioning
tasks_v2 12 Task management + streaming
workspace_v2 5 Init, status, tech stack
batches_v2 5 Batch execution strategies
streaming_v2 2 SSE event streaming
api_key_v2 4 API key management
discovery_v2 5 PRD discovery sessions
checkpoints_v2 6 State checkpoints
schedule_v2 3 Task scheduling
templates_v2 4 PRD templates
git_v2 3 Git operations
review_v2 2 Code review
pr_v2 5 GitHub PR workflow
environment_v2 4 Tool detection
proof_v2 7 PROOF9 quality gates + requirements

API Authentication:

# Create API key
cf auth api-key-create --name "my-key" --scopes read,write

# Use in requests
curl -H "X-API-Key: cf_..." https://api.example.com/api/v2/tasks

Rate Limiting:

  • Default: 100 requests/minute (standard endpoints)
  • Auth endpoints: 10/minute
  • AI endpoints: 20/minute
  • Configurable via RATE_LIMIT_* environment variables

OpenAPI Documentation:

  • Swagger UI: /docs
  • ReDoc: /redoc
  • OpenAPI JSON: /openapi.json

Previous Updates (2026-01-29)

V2 Strategic Roadmap Established

Created comprehensive 5-phase roadmap in docs/V2_STRATEGIC_ROADMAP.md.

Phase 1 Complete: CLI Foundation

All Phase 1 priorities completed:

Environment Validation (cf env)

New commands for validating development environment:

cf env check              # Validate required tools (git, uv, ruff, pytest)
cf env install            # Install missing tools automatically
cf env doctor             # Comprehensive environment health check

Modules:

  • core/environment.py - Tool detection and validation
  • core/installer.py - Cross-platform tool installation

GitHub PR Workflow (cf pr)

Streamlined PR management without leaving the CLI:

cf pr create              # Create PR from current branch
cf pr status              # Show PR status and review state
cf pr checks              # Show CI check results
cf pr merge               # Merge approved PR

Task Self-Diagnosis (cf work diagnose)

AI-powered analysis of failed tasks:

cf work diagnose <task-id>   # Analyze why a task failed

Modules:

  • core/diagnostics.py - Failed task analysis
  • core/diagnostic_agent.py - AI-powered diagnosis

Bug Fixes

GitHub Issue Organization


Previous Updates (2026-01-16)

Phase 3.1: Tech Stack Configuration

Simplified tech stack configuration using natural language descriptions:

  1. tech_stack field on Workspace model - stores natural language description
  2. --detect flag - auto-detects from pyproject.toml, package.json, Cargo.toml, go.mod
  3. --tech-stack flag - explicit tech stack description (e.g., "Rust project with cargo")
  4. --tech-stack-interactive flag - simple prompt for user input (stub for future multi-round)
  5. Agent integration - TaskContext and Planner include tech_stack in LLM prompts
  6. Removed cf config subcommand - tech stack is now part of workspace init

Design philosophy: Instead of structured configuration with specific package managers and frameworks, users describe their stack in natural language. The agent interprets and adapts.

Examples:

cf init . --detect                           # Auto-detect: "Python with uv, pytest, ruff for linting"
cf init . --tech-stack "Rust project using cargo"
cf init . --tech-stack "TypeScript monorepo with pnpm, Next.js, jest"
cf init . --tech-stack-interactive           # Prompts user for description

Future work: Multi-round interactive discovery (bead: codeframe-8d80)


Agent Self-Correction & Observability

Improved agent reliability with automatic error recovery:

  1. Self-correction loop in _run_final_verification() - agent retries up to 3 times
  2. Verbose mode (--verbose / -v) - shows detailed verification/self-correction progress
  3. FAILED task status - tasks transition to FAILED for proper error visibility
  4. Project preferences - agent loads AGENTS.md/CLAUDE.md for per-project config
  5. Fixed fail_run() - now properly transitions task status (was leaving tasks stuck)

Enhanced Self-Correction (Phase 3.4)

Advanced error recovery with loop prevention and smart escalation:

  1. Fix Attempt Tracker (core/fix_tracker.py) - prevents repeating failed fixes

    • Normalizes errors for comparison (removes line numbers, memory addresses)
    • Tracks (error_signature, fix_description) pairs with outcomes
    • Detects escalation patterns (same error 3+ times, same file 3+ times)
  2. Pattern-Based Quick Fixes (core/quick_fixes.py) - fixes common errors without LLM

    • ModuleNotFoundError → auto-install package (detects package manager)
    • ImportError → add missing import statement
    • NameError → add common imports (Optional, dataclass, Path, etc.)
    • SyntaxError → fix missing colons, f-string prefixes
    • IndentationError → normalize mixed tabs/spaces
  3. Escalation to Blocker - creates informative blockers when stuck

    • Triggered after MAX_SAME_ERROR_ATTEMPTS (3) failures on same error
    • Triggered after MAX_SAME_FILE_ATTEMPTS (3) failures on same file
    • Triggered after MAX_TOTAL_FAILURES (5) in a run
    • Blocker includes error type, attempted fixes, and guidance questions

Self-Correction Flow

Error occurs
    │
    ├── Try ruff --fix (auto-lint)
    │
    ├── Try pattern-based quick fix (no LLM)
    │   ├── Check if fix already attempted → skip
    │   ├── Apply fix
    │   └── Record outcome in tracker
    │
    ├── Check escalation threshold
    │   └── If exceeded → create escalation blocker
    │
    └── Use LLM to generate fix plan
        ├── Include already-tried fixes to avoid repetition
        ├── Execute fix steps with tracking
        └── Re-verify

Key Self-Correction Methods

  • _run_final_verification(): While loop that re-runs gates after self-correction
  • _attempt_verification_fix(): Orchestrates quick fixes, escalation check, LLM fixes
  • _create_escalation_blocker(): Creates detailed blocker with context
  • _verbose_print(): Conditional stdout output for observability

Phase 2 Complete (2026-01-15): Parallel Batch Execution

All 6 Phase 2 items from CLI_WIREFRAME.md are done:

  1. work batch resume <batch-id> - re-run failed/blocked tasks
  2. depends_on field on Task model
  3. ✅ Dependency graph analysis (DAG, cycle detection, topological sort)
  4. ✅ True parallel execution with ThreadPoolExecutor worker pool
  5. --strategy auto with LLM-based dependency inference
  6. --retry N automatic retry of failed tasks

Key Phase 2 Modules

  • conductor.py: Batch orchestration with serial/parallel/auto strategies
  • dependency_graph.py: DAG operations, level-based grouping for parallelization
  • dependency_analyzer.py: LLM analyzes task descriptions to infer dependencies

Agent Implementation Complete (2026-01-14)

All 8 implementation tasks from AGENT_IMPLEMENTATION_TASKS.md are done:

  1. ✅ LLM Adapter Interface (adapters/llm/)
  2. ✅ Task Context Loader (core/context.py)
  3. ✅ Agent Planning (core/planner.py)
  4. ✅ Code Execution Engine (core/executor.py)
  5. ✅ Automatic Blocker Detection (in core/agent.py)
  6. ✅ Gate Integration (in core/agent.py)
  7. ✅ Agent Orchestrator (core/agent.py)
  8. ✅ Wire into Runtime (core/runtime.py)

Bug Fixes During Testing

  • GateResult attribute access: Fixed gate_result.statusgate_result.passed
  • Duplicate task transitions: Removed task status updates from agent.py (runtime handles all)
  • READY→READY error: Added check in stop_run before transitioning
  • Verification step handling: Made _execute_verification smarter about file vs command targets

Key Design Decisions

  • State separation: Agent manages AgentState, Runtime manages TaskStatus
  • Model selection: Task-based heuristic via Purpose enum
  • Blocker creation: Agent creates blockers, Runtime updates task status
  • Verification: Incremental (ruff after each file change) + final (all gates)

Testing

Run all tests

uv run pytest

Run v2 tests only

uv run pytest -m v2           # All v2 tests (~411 tests)
uv run pytest -m v2 -q        # Quiet mode

The v2 marker identifies tests for CLI-first, headless functionality:

  • All tests in tests/core/ are automatically marked v2 (via conftest.py)
  • v2 CLI tests have pytestmark = pytest.mark.v2 at the top

Convention: When adding new v2 functionality, mark tests with @pytest.mark.v2 or add pytestmark = pytest.mark.v2 at module level for CLI tests that use codeframe.cli.app.

Run core module tests

uv run pytest tests/core/
uv run pytest tests/core/test_agent.py -v
uv run pytest tests/adapters/test_llm.py -v

Test coverage

uv run pytest --cov=codeframe --cov-report=html

Environment Variables

# Required for agent execution
ANTHROPIC_API_KEY=sk-ant-...

# Optional - Database
DATABASE_PATH=./codeframe.db

# Optional - Rate Limiting (Phase 2)
RATE_LIMIT_ENABLED=true                    # Enable/disable rate limiting
RATE_LIMIT_DEFAULT=100/minute              # Default limit
RATE_LIMIT_AUTH=10/minute                  # Auth endpoints
RATE_LIMIT_AI=20/minute                    # AI/LLM endpoints
RATE_LIMIT_WEBSOCKET=50/minute             # WebSocket connections
REDIS_URL=redis://localhost:6379           # Redis for distributed rate limiting (optional)

# Optional - API Server
CODEFRAME_API_KEY_SECRET=<random-secret>   # Secret for API key hashing

Legacy sections removed on purpose

This file previously contained extensive v1 details (auth, websocket, UI template, sprint history).
Those are still in git history and legacy docs, but they are not the current contract.

The current contract is Golden Path + Refactor Plan + Command Tree mapping + Agent Implementation.

@claude

claude Bot commented Mar 31, 2026

Copy link
Copy Markdown

Review content pending - needs file write permission

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
codeframe/core/adapters/streaming_chat.py (4)

378-381: ⚠️ Potential issue | 🟠 Major

Emit DONE when an interrupt trips mid-stream.

This branch exits the generator without a terminal event, so interrupted responses never produce a clear end-of-turn signal.

🔚 Proposed fix
                     # Honour interrupt between chunks
                     if interrupt_event and interrupt_event.is_set():
-                        return
+                        yield ChatEvent(type=ChatEventType.DONE)
+                        return
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/core/adapters/streaming_chat.py` around lines 378 - 381, The
generator loop at "async for sdk_event in stream" returns immediately when "if
interrupt_event and interrupt_event.is_set()" is true, causing no terminal event
to be emitted; modify that branch to emit/return a terminal DONE event (the same
kind used elsewhere for end-of-turn) before exiting so interrupted responses
still produce a clear end-of-turn signal—update the interrupt branch in the
generator around async for sdk_event in stream to send the standard DONE event
(using the same event type/constructor the codebase uses for normal completion)
and then return.

427-432: ⚠️ Potential issue | 🔴 Critical

Insert the assistant tool_use message before the follow-up tool_result message.

The retry request appends only the user-side tool_result blocks. Anthropic's tool-call flow requires the assistant message containing the tool_use blocks to appear immediately before that user message, so real tool turns can 400 even though the mocked test passes. (platform.claude.com)

Also applies to: 477-480

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/core/adapters/streaming_chat.py` around lines 427 - 432, The code
rebuilds tool inputs from final_msg.content and then appends only the user-side
tool_result, but Anthropic requires the assistant-side tool_use message to
precede any user tool_result; update the logic in streaming_chat.py around
stream.get_final_message()/final_msg handling so that when pending_tool_calls
exists and final_msg has content you first synthesize and insert an assistant
"tool_use" message corresponding to the pending_tool_calls (using the same
structure used for real tool turns) immediately before appending the user
"tool_result" message, then call _rebuild_tool_inputs(final_msg.content,
pending_tool_calls) to reconstruct inputs; ensure the same fix is applied to the
analogous block handling final messages later (the block around the other
occurrence near the second _rebuild_tool_inputs call).

199-200: ⚠️ Potential issue | 🟠 Major

Load the newest history window, not the repo default.

InteractiveSessionRepository.get_messages() defaults to the oldest 100 rows (ORDER BY created_at LIMIT 100), so once a session grows past 100 messages this adapter reconstructs stale context before _truncate_history() ever runs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/core/adapters/streaming_chat.py` around lines 199 - 200, The
adapter is calling InteractiveSessionRepository.get_messages(self._session_id)
which returns the oldest 100 rows by default, causing stale context when
sessions exceed that size; change the call to fetch the newest history window
instead (e.g. pass explicit params to get_messages to ORDER BY created_at DESC
and LIMIT to the desired window size or call a repository method that returns
the latest N messages) so that rows =
self._db_repo.get_messages(self._session_id, order="desc",
limit=<history_window>) (or equivalent) is used before reconstructing context
and before _truncate_history() runs.

241-260: ⚠️ Potential issue | 🟠 Major

Split persistence into “user before stream” and “assistant after completion.”

The user row is only written after _stream_turn() returns. That drops prompts on errors or cancellations, and it also lets interrupted turns persist partial assistant text as if the response finished.

Also applies to: 342-345

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/core/adapters/streaming_chat.py` around lines 241 - 260, The
current _persist_turn writes both user and assistant rows only after
_stream_turn completes, which loses user prompts on errors/cancels and can
record partial assistant text as finished; fix by splitting persistence so the
user message is persisted immediately before streaming begins (call
self._db_repo.add_message with role="user" and content=user_content on the main
flow before invoking _stream_turn) and keep only the assistant persistence for
completed responses in _persist_turn (or an _persist_assistant method) so that
assistant text is written only after successful completion (and handle
cancellations/errors to avoid writing partial assistant content); update
references to _persist_turn, _stream_turn, and add_message accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@codeframe/core/adapters/streaming_chat.py`:
- Around line 477-480: The history budget is only enforced once via
_truncate_history() before the first API call, so after appending
tool_result_blocks to current_messages you must re-apply truncation to avoid
exceeding the 180k budget on the next messages.stream() call; update the retry
path in the streaming logic to call _truncate_history(current_messages or the
equivalent message list) immediately after appending
{"role":"user","content":tool_result_blocks} (and before invoking
messages.stream() or any retry of send/stream), ensuring you pass the same
parameters used originally (e.g., model/context limits) so the truncated message
list is safe for subsequent calls.

---

Duplicate comments:
In `@codeframe/core/adapters/streaming_chat.py`:
- Around line 378-381: The generator loop at "async for sdk_event in stream"
returns immediately when "if interrupt_event and interrupt_event.is_set()" is
true, causing no terminal event to be emitted; modify that branch to emit/return
a terminal DONE event (the same kind used elsewhere for end-of-turn) before
exiting so interrupted responses still produce a clear end-of-turn signal—update
the interrupt branch in the generator around async for sdk_event in stream to
send the standard DONE event (using the same event type/constructor the codebase
uses for normal completion) and then return.
- Around line 427-432: The code rebuilds tool inputs from final_msg.content and
then appends only the user-side tool_result, but Anthropic requires the
assistant-side tool_use message to precede any user tool_result; update the
logic in streaming_chat.py around stream.get_final_message()/final_msg handling
so that when pending_tool_calls exists and final_msg has content you first
synthesize and insert an assistant "tool_use" message corresponding to the
pending_tool_calls (using the same structure used for real tool turns)
immediately before appending the user "tool_result" message, then call
_rebuild_tool_inputs(final_msg.content, pending_tool_calls) to reconstruct
inputs; ensure the same fix is applied to the analogous block handling final
messages later (the block around the other occurrence near the second
_rebuild_tool_inputs call).
- Around line 199-200: The adapter is calling
InteractiveSessionRepository.get_messages(self._session_id) which returns the
oldest 100 rows by default, causing stale context when sessions exceed that
size; change the call to fetch the newest history window instead (e.g. pass
explicit params to get_messages to ORDER BY created_at DESC and LIMIT to the
desired window size or call a repository method that returns the latest N
messages) so that rows = self._db_repo.get_messages(self._session_id,
order="desc", limit=<history_window>) (or equivalent) is used before
reconstructing context and before _truncate_history() runs.
- Around line 241-260: The current _persist_turn writes both user and assistant
rows only after _stream_turn completes, which loses user prompts on
errors/cancels and can record partial assistant text as finished; fix by
splitting persistence so the user message is persisted immediately before
streaming begins (call self._db_repo.add_message with role="user" and
content=user_content on the main flow before invoking _stream_turn) and keep
only the assistant persistence for completed responses in _persist_turn (or an
_persist_assistant method) so that assistant text is written only after
successful completion (and handle cancellations/errors to avoid writing partial
assistant content); update references to _persist_turn, _stream_turn, and
add_message accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d0293c5b-a769-4481-a100-8cca2625cdac

📥 Commits

Reviewing files that changed from the base of the PR and between bdef057 and e19eb48.

📒 Files selected for processing (3)
  • codeframe/core/adapters/streaming_chat.py
  • codeframe/ui/routers/session_chat_ws.py
  • tests/core/test_streaming_chat.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • codeframe/ui/routers/session_chat_ws.py

Comment on lines +477 to +480
# Append the tool results as a user message for the next turn
current_messages = current_messages + [
{"role": "user", "content": tool_result_blocks}
]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Re-apply the history budget before each tool-driven retry.

_truncate_history() only runs before the first API call. After appending tool_result blocks here, a large read_file or search_codebase result can push the very next messages.stream() call past the 180k budget.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/core/adapters/streaming_chat.py` around lines 477 - 480, The
history budget is only enforced once via _truncate_history() before the first
API call, so after appending tool_result_blocks to current_messages you must
re-apply truncation to avoid exceeding the 180k budget on the next
messages.stream() call; update the retry path in the streaming logic to call
_truncate_history(current_messages or the equivalent message list) immediately
after appending {"role":"user","content":tool_result_blocks} (and before
invoking messages.stream() or any retry of send/stream), ensuring you pass the
same parameters used originally (e.g., model/context limits) so the truncated
message list is safe for subsequent calls.

@claude

claude Bot commented Mar 31, 2026

Copy link
Copy Markdown

Review: StreamingChatAdapter

Good work replacing the echo stub with a real adapter. The event model, safe tool set, interrupt support, and test structure are all solid. A few issues worth addressing before merge.

Bug: Missing assistant message in tool loop

In _stream_turn, when pending tool calls are executed the code builds the next turn by appending only the user message with tool_result blocks, but never first appends the assistant message containing the tool_use blocks. The Anthropic API requires alternating assistant (with tool_use) then user (with tool_result). Sending only the tool_result user message will cause API errors or context corruption on any real tool loop. The final message from stream.get_final_message() already has the full assistant content; it just needs to be added to current_messages before the tool_result turn.

Bug: DONE event not emitted after ERROR

In send_message, when _stream_turn raises, an ERROR event is yielded and the generator returns with no DONE following. Similarly in _run_streaming_adapter, exceptions before iteration starts send an error dict but no done dict. The relay task almost certainly loops until it sees done, so without it the WebSocket relay will hang. A DONE event should always be the final event, even on error.

API confusion: empty list vs. no history

The check 'if not history' treats [] (valid empty history) the same as None (no history supplied). There is no way to skip DB loading for a fresh session. Using None as the sentinel would make the intent explicit. Also: _load_history() is a synchronous DB call invoked from async send_message without asyncio.to_thread, which will block the event loop under load.

No iteration limit in the tool loop

The while True loop in _stream_turn has no max-iteration guard. A model that repeatedly returns stop_reason == tool_use would loop indefinitely and rack up API costs. A simple cap with a logged warning would prevent this.

TOOL_USE_START emits empty tool_input for streamed inputs

At content_block_start time, getattr(block, 'input', {}) is typically empty because input arrives via subsequent input_json_delta events (currently ignored with pass). The TOOL_USE_START event is yielded immediately with an empty dict. _rebuild_tool_inputs corrects this internally but the client already received the empty payload. Worth a comment at minimum.

Security: workspace_path falls back silently to CWD

When raw_workspace is None, the fallback is Path('.'), scoping all file reads to the server CWD. The warning is logged but the adapter proceeds, which could expose server files depending on deployment. Consider raising an error or emitting an error event instead.

Minor

send_message declares -> AsyncIterator[ChatEvent] but uses yield, making it an AsyncGenerator. The return type should be AsyncGenerator[ChatEvent, None]. The test test_yields_tool_use_start_and_result relies on the message_stop flush path (no content_block_stop event) — worth a comment that this is intentional coverage of that branch.

Summary

Critical: (1) missing assistant message in tool loop, (2) missing DONE-after-ERROR. Everything else is medium priority. Test coverage is good; a test that exercises two or more full tool loop turns would catch the tool loop bug directly.

…g_adapter

_run_streaming_adapter now takes db_repo and workspace_path as additional
arguments. Tests that patched it with 4-arg fake adapters would silently fail
(asyncio task drops TypeError) and hang waiting for queue events that never arrive.

Updated all three fake adapters to accept the full 6-argument signature.
@claude

claude Bot commented Mar 31, 2026

Copy link
Copy Markdown

Review — round 3 (after commit e19eb48)

Good progress on commit e19eb48 — the medium issues from my first review are resolved: _truncate_history now drops in pairs and enforces the user-first invariant, system= is added, and the pricing table has a last-verified date. Thanks for addressing those.

The two critical bugs flagged in my second review are still unresolved.


Critical — still blocking merge

1. Missing assistant message in the tool loop

_stream_turn appends only the user-side tool_result message before re-entering the API:

# current code (line ~495)
current_messages = current_messages + [
    {"role": "user", "content": tool_result_blocks}
]

The Anthropic Messages API requires the assistant's tool_use response to precede any tool_result. The correct sequence is:

current_messages = current_messages + [
    {"role": "assistant", "content": final_msg.content},   # tool_use blocks
    {"role": "user",      "content": tool_result_blocks},  # tool_result blocks
]

final_msg is already in scope at that point (set by the message_stop handler). Without this, every tool call will hit an Anthropic API validation error (messages: roles must alternate between "user" and "assistant").


2. Missing DONE after ERROR

In send_message, when _stream_turn raises, an ERROR event is yielded and the generator returns with no terminal DONE:

except Exception as exc:
    yield ChatEvent(type=ChatEventType.ERROR, content=str(exc))
    return   # ← relay task loops forever waiting for DONE

Fix:

except Exception as exc:
    yield ChatEvent(type=ChatEventType.ERROR, content=str(exc))
    yield ChatEvent(type=ChatEventType.DONE)
    return

Medium — should fix before merge

3. Interrupt inside the stream loop returns without DONE

The check at the top of async for sdk_event in stream calls bare return when the interrupt fires, with no DONE emitted. The tool-execution interrupt check correctly yields DONE first (good); the stream-chunk interrupt check should match:

if interrupt_event and interrupt_event.is_set():
    yield ChatEvent(type=ChatEventType.DONE)
    return

4. _load_history() is synchronous but called in async context

_load_history calls self._db_repo.get_messages(...) directly (a blocking DB call) without asyncio.to_thread. This will block the event loop on every turn. Should be wrapped the same way _persist_turn wraps add_message:

# In send_message:
if history is None:
    history = await asyncio.to_thread(self._db_repo.get_messages, self._session_id)
    history = [{"role": r["role"], "content": r["content"]} for r in history]

This also resolves the if not history / if history is None distinction flagged previously — use None as the sentinel so callers can pass an explicit empty list without triggering a DB reload.

5. No iteration limit on the while True tool loop

A model stuck in a tool_usetool_result cycle will spin indefinitely. Add a cap (e.g., MAX_TOOL_TURNS = 10) and yield an ERROR + DONE when exceeded.


Minor

_truncate_history will fail after tool results are appended

After a tool turn, current_messages contains entries where content is list[dict] (the tool_result_blocks). If re-truncation were ever added (see CodeRabbit's note), the _count function would pass a list to enc.encode(...) and raise TypeError. The token counter should guard for non-string content:

def _count(msgs):
    total = 0
    for m in msgs:
        c = m.get("content") or ""
        if isinstance(c, str):
            total += len(enc.encode(c))
        elif isinstance(c, list):
            total += sum(len(enc.encode(b.get("content") or "")) for b in c if isinstance(b, dict))
    return total

Summary

Issue Status
_truncate_history pair-drop + user-first ✅ Fixed
system= prompt ✅ Fixed
_estimate_cost last-verified comment ✅ Fixed
workspace_path warning ✅ Partial (warning logged; still silently uses ".")
Missing assistant message in tool loop Still present — API will reject tool turns
DONE not emitted after ERROR Still present — relay will hang
Interrupt in stream returns without DONE ❌ Still present
_load_history blocks event loop ❌ Still present
No iteration limit on tool loop ❌ Still present

The two critical issues will cause observable failures the first time a real tool call is made and the first time the Anthropic API returns an error. Please address those before merge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Backend: Streaming chat adapter for Anthropic SDK (token-by-token relay)

1 participant