Skip to content

feat(adapters): abstract streaming layer behind LLMProvider (#548)#553

Merged
frankbria merged 3 commits intomainfrom
feat/548-abstract-streaming-layer
Apr 7, 2026
Merged

feat(adapters): abstract streaming layer behind LLMProvider (#548)#553
frankbria merged 3 commits intomainfrom
feat/548-abstract-streaming-layer

Conversation

@frankbria
Copy link
Copy Markdown
Owner

@frankbria frankbria commented Apr 6, 2026

Summary

  • Add StreamChunk dataclass to base.py as a normalized streaming type shared across all providers
  • Add async_stream() and supports() to LLMProvider ABC
  • Implement async_stream() in AnthropicProvider — moves Anthropic SDK streaming out of streaming_chat.py; supports("extended_thinking") returns True
  • Implement async_stream() in OpenAIProvider — translates OpenAI SSE chunks to StreamChunk
  • Add async_stream() to MockProvider with configurable StreamChunk sequences for testing
  • Refactor StreamingChatAdapter to accept LLMProvider; no import anthropic remains
  • Update session_chat_ws.py to construct AnthropicProvider explicitly as the composition root
  • Fix test_e2b_adapter.py to skip TestE2BAgentAdapter when the e2b package is not installed

Closes #548

Acceptance Criteria

  • streaming_chat.py has no import anthropic
  • Streaming works with Anthropic provider (regression tests)
  • Streaming works with OpenAI provider (new async_stream() implementation)
  • Extended thinking gracefully no-ops on non-Anthropic providers (supports() returns False by default)
  • Existing streaming tests pass (updated to use MockProvider)

Test plan

  • tests/core/test_streaming_chat.py — 20 tests, all pass; updated to use MockProvider.async_stream() instead of patching _async_client
  • tests/adapters/test_llm.py, test_llm_async.py, test_llm_openai.py — 62 tests, all pass
  • Full backend suite — 2217 passed, 9 skipped (e2b optional extra)
  • ruff check . — no errors

Summary by CodeRabbit

  • New Features
    • Adds real-time async streaming support across providers, including "extended thinking" mode where supported
  • Improvements
    • Adapter is provider-agnostic for easier integration with multiple LLMs
    • More robust streaming of tool calls, thinking/text deltas, early interrupt handling, and end-of-message cost reporting
  • Tests
    • Updated tests to drive streaming via providers and skip adapter tests when optional extras are missing

- Add StreamChunk dataclass to base.py as normalized streaming type
- Add async_stream() + supports() to LLMProvider ABC
- Implement async_stream() in AnthropicProvider (moves streaming out of
  streaming_chat.py; supports("extended_thinking") returns True)
- Implement async_stream() in OpenAIProvider (SSE → StreamChunk)
- Add async_stream() to MockProvider with configurable StreamChunk sequences
- Refactor StreamingChatAdapter to accept LLMProvider; no import anthropic
- Update session_chat_ws.py to construct AnthropicProvider explicitly
- Export StreamChunk from adapters.llm.__init__
- Update streaming chat tests to use MockProvider instead of _async_client patch
- Fix test_e2b_adapter.py to skip TestE2BAgentAdapter when e2b not installed
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 6, 2026

Walkthrough

A provider-agnostic async streaming abstraction was added: StreamChunk and LLMProvider.async_stream()/supports() were introduced. Anthropic, OpenAI, and Mock providers implement async_stream() to emit normalized StreamChunks. StreamingChatAdapter was refactored to consume provider chunks and now accepts an injected LLMProvider; callers now instantiate and pass an AnthropicProvider where needed. Tests updated to use MockProvider streaming and optional e2b skipping added.

Changes

Cohort / File(s) Summary
Base Streaming API
codeframe/adapters/llm/base.py, codeframe/adapters/llm/__init__.py
Added StreamChunk dataclass and re-exported it. Extended LLMProvider with supports(capability: str) -> bool (default False) and async_stream(...)->AsyncIterator[StreamChunk] abstract entry point.
Provider Implementations
codeframe/adapters/llm/anthropic.py, codeframe/adapters/llm/openai.py, codeframe/adapters/llm/mock.py
Implemented async_stream() in each provider to convert provider-specific streaming events into normalized StreamChunk types (text_delta, thinking_delta, tool_use_start, tool_use_stop, message_stop). Anthropic adds supports("extended_thinking"); OpenAI maps function-calling/tool streaming and maps errors to LLM errors; MockProvider adds queueing and add_stream_chunks() for tests.
Adapter & Router
codeframe/core/adapters/streaming_chat.py, codeframe/ui/routers/session_chat_ws.py
Refactored StreamingChatAdapter to consume provider async_stream() chunks and accept injected LLMProvider (defaults to constructing AnthropicProvider when None). session_chat_ws now explicitly instantiates and passes AnthropicProvider. Tool/input/token handling and thinking events now come from StreamChunk fields.
Tests & Optional Dependency
tests/core/test_streaming_chat.py, tests/adapters/test_e2b_adapter.py
Tests rewritten to use MockProvider and StreamChunk sequences; new helpers produce message_stop chunks. tests/adapters/test_e2b_adapter.py skips when optional e2b dependency is absent via importlib.util.find_spec.

Sequence Diagram

sequenceDiagram
    participant Adapter as StreamingChatAdapter
    participant Provider as LLMProvider<br/>(Anthropic/OpenAI/Mock)
    participant API as External API<br/>(Anthropic/OpenAI)    
    Adapter->>Provider: async_stream(messages, system, tools, ...)
    Provider->>API: open/stream request (provider-specific)
    loop stream events
        API-->>Provider: provider event / delta
        Provider->>Provider: normalize -> StreamChunk
        Provider-->>Adapter: emit StreamChunk
        Adapter->>Adapter: handle chunk (text/thinking/tool/cost)
    end
    API-->>Provider: completion / final message
    Provider->>Adapter: emit final message_stop StreamChunk (tokens, tool_inputs_by_id)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

Poem

🐇 I hopped through code and found the streams,

Chunks lined up like candies in my dreams.
Anthropic, OpenAI, Mock now sing along,
One gentle stream, one tidy song.
Hop—abstraction stitched and strong.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 60.38% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title 'feat(adapters): abstract streaming layer behind LLMProvider (#548)' clearly and concisely describes the main change—abstracting the streaming layer to use LLMProvider—which is the core objective.
Linked Issues check ✅ Passed The PR successfully implements all requirements from issue #548: adds StreamChunk dataclass, extends LLMProvider with async_stream/supports, implements streaming in AnthropicProvider/OpenAIProvider/MockProvider, refactors StreamingChatAdapter to accept LLMProvider without Anthropic imports, and implements extended_thinking via provider.supports().
Out of Scope Changes check ✅ Passed All changes directly support the core objective of abstracting streaming behind LLMProvider. The E2B test skip marker is a minor dependency-handling improvement. No unrelated or tangential modifications were introduced.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/548-abstract-streaming-layer

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Copy Markdown

claude bot commented Apr 6, 2026

Review: feat(adapters): abstract streaming layer behind LLMProvider (#548)

This is a solid architectural improvement — streaming_chat.py is now genuinely provider-agnostic, and the StreamChunk normalization gives a clean contract for all providers to implement. The test refactor from patching _async_client internals to MockProvider.add_stream_chunks() is a real quality-of-life improvement for future test authors.


Issues worth addressing

1. tool_use_stop ordering is inconsistent between providers

In anthropic.py, tool_use_stop is emitted on content_block_stop — i.e., as each tool call's streaming completes, mid-stream. In openai.py, all tool_use_stop chunks are emitted after the async for loop finishes (right before message_stop). This means with OpenAI, a consumer never sees interleaved tool_use_start → [deltas] → tool_use_stop sequences — all the stops come at the end.

The current consumer in streaming_chat.py appears tolerant of this (it uses tool_inputs_by_id from message_stop rather than reacting to tool_use_stop), but the inconsistency is a latent footgun if anyone builds against the protocol expecting Anthropic semantics. Consider either documenting this difference in StreamChunk's docstring or fixing OpenAI to emit tool_use_stop within the loop once finish_reason signals a completed tool (harder without SSE markers, I know).

2. session_chat_ws.py constructs AnthropicProvider twice unnecessarily

# session_chat_ws.py
provider = AnthropicProvider()
adapter = StreamingChatAdapter(..., provider=provider)

The explicit construction here is correct (composition root pattern). But StreamingChatAdapter.__init__ still has its own AnthropicProvider() fallback for the provider=None case. Since session_chat_ws.py always provides a provider, the fallback is now only exercised by the backward-compat test (test_accepts_api_key_from_env). This is fine, but worth a comment in __init__ noting the fallback exists for backwards compatibility.

3. base.pyasync_stream() is not @abstractmethod

The if False: yield sentinel is a known pattern for making a raise NotImplementedError method into an async generator — but skipping @abstractmethod means subclasses that forget to override it will only fail at call time, not at instantiation. Given supports() is also non-abstract (intentional for extensibility), this may be a consistent design choice, but it's worth a comment like # not @abstractmethod — providers may omit if streaming is unsupported.


Minor nits

  • mock.py default fallback: output_tokens=len(self.default_response) uses character count as a proxy for token count. Fine for tests, but could surprise someone who asserts exact values expecting token semantics.

  • openai.py line ~264: partial_tool_calls accumulates across the loop but is never reset between calls. Since async_stream is a fresh method invocation each time, there's no actual statefulness bug — just confirming this is intentional.


What's working well

  • Moving import anthropic to the composition root (session_chat_ws.py / AnthropicProvider.__init__) and out of streaming_chat.py is exactly right.
  • StreamChunk dataclass fields are well-documented and cleanly typed.
  • MockProvider.reset() correctly clears stream_chunks and stream_index.
  • The e2b skip using importlib.util.find_spec is cleaner than try/import.
  • System prompt moved outside the while True: loop in _stream_turn — small but correct.
  • Test coverage for test_non_anthropic_provider_no_thinking is a nice addition.

The tool_use_stop ordering issue is the only thing I'd want resolved before merge (or explicitly documented as a known difference). Everything else is polish.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (2)
codeframe/adapters/llm/openai.py (2)

235-248: Duplicate tool conversion logic.

This inline tool conversion duplicates the logic in _convert_tools (lines 401-413). Consider reusing the existing method for consistency and maintainability.

Proposed refactor to reuse _convert_tools

If the signature is updated to accept list[Tool]:

         # Convert tools from Anthropic-style (input_schema) to OpenAI format
         if tools:
-            kwargs["tools"] = [
-                {
-                    "type": "function",
-                    "function": {
-                        "name": t["name"],
-                        "description": t.get("description", ""),
-                        "parameters": t.get("input_schema", {}),
-                    },
-                }
-                for t in tools
-            ]
+            kwargs["tools"] = self._convert_tools(tools)
             kwargs["tool_choice"] = "auto"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/adapters/llm/openai.py` around lines 235 - 248, The inline
conversion block that builds kwargs["tools"] and sets kwargs["tool_choice"]
duplicates logic in _convert_tools; replace the inline conversion with a call to
the existing _convert_tools(...) helper (passing the tools list and any expected
params) and assign its return to kwargs["tools"], then set kwargs["tool_choice"]
= "auto" only if _convert_tools returns a non-empty list; update _convert_tools
signature if needed to accept the current Tool shape so the conversion is
centralized and consistent with the rest of the module.

305-315: Silent fallback on JSON decode errors may hide malformed tool arguments.

When json.loads fails, the code silently defaults to {} (line 313). This could mask issues where the LLM produces malformed JSON, making debugging difficult. Consider logging a warning when this occurs.

Proposed improvement to add logging
+import logging
+
+logger = logging.getLogger(__name__)
+
         # Build tool_inputs_by_id from accumulated partial tool calls
         tool_inputs_by_id: dict = {}
         for tc in partial_tool_calls.values():
             try:
                 tool_inputs_by_id[tc["id"]] = json.loads(
                     "".join(tc["arguments_parts"]) or "{}"
                 )
             except json.JSONDecodeError:
+                logger.warning(
+                    "Failed to parse tool arguments for tool %s: %s",
+                    tc["name"],
+                    "".join(tc["arguments_parts"]),
+                )
                 tool_inputs_by_id[tc["id"]] = {}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/adapters/llm/openai.py` around lines 305 - 315, The loop that
builds tool_inputs_by_id currently swallows JSONDecodeError and silently uses {}
for malformed tool arguments; update the exception handler in the block
iterating partial_tool_calls so that when json.loads fails you log a warning
(including the tool id tc["id"] and the raw arguments string
"".join(tc["arguments_parts")) before falling back to {}), referencing the
existing logger used in this module (e.g., module-level logger or
process_logger) and keep the existing behavior of yielding
StreamChunk(type="tool_use_stop") after each tool call.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@codeframe/adapters/llm/anthropic.py`:
- Around line 248-263: The code calls the async method
stream.get_final_message() without awaiting it, so final_msg becomes a coroutine
and attribute access fails; update the block around stream.get_final_message()
to await the call (i.e., final_msg = await stream.get_final_message()) before
using final_msg.stop_reason, final_msg.content, final_msg.usage, and building
tool_inputs_by_id, ensuring the function containing this logic is async or
otherwise supports awaiting, and preserve the existing StreamChunk construction
that references final_msg.

In `@codeframe/adapters/llm/openai.py`:
- Around line 281-293: The early emission of StreamChunk(type="tool_use_start")
in the partial_tool_calls creation may send empty tool_id/tool_name because
tc_delta.id or tc_delta.function.name can be None; change the logic in the block
that creates partial_tool_calls (where partial_tool_calls[idx] is initialized
from tc_delta) to defer emitting the "tool_use_start" StreamChunk until both
tc_delta.id and tc_delta.function.name are non-empty (or record the partial
entry without emitting and emit the StreamChunk later when a subsequent tc_delta
provides the missing id/name), and ensure updates to the existing
partial_tool_calls entry trigger a single emission of
StreamChunk(type="tool_use_start") with populated tool_id and tool_name instead
of emitting early with empty strings.
- Around line 201-209: The async_stream method lacks the same OpenAI exception
handling and has an inconsistent tools type: update async_stream to mirror
async_complete by wrapping the streaming call in a try/except that catches
OpenAI exceptions (AuthenticationError, RateLimitError, APIConnectionError) and
rethrows/translate them the same way async_complete does so callers receive
consistent OpenAIError semantics, and change the tools parameter type from
list[dict] to list[Tool] to match other methods (refer to async_stream and
async_complete function names and the tools param) so callers can pass the same
Tool objects across methods.

In `@codeframe/core/adapters/streaming_chat.py`:
- Around line 352-357: The system_prompt currently interpolates
self._workspace_path and leaks an absolute path to the LLM; update the prompt
construction in the code where system_prompt is defined (in streaming_chat.py,
the variable system_prompt inside the relevant class/method) to remove
{self._workspace_path} and use a neutral phrase such as "the current workspace"
(e.g., replace the f-string that includes self._workspace_path with a static
string mentioning "the current workspace") so the model retains context without
exposing internal filesystem layout.
- Around line 363-370: Compute a boolean by calling
supports("extended_thinking") and pass it into the provider stream call (the
async_stream(...) invocation) as an explicit keyword option (e.g.,
extended_thinking=extended_thinking) so the provider can emit
thinking_delta/THINKING events only when opted-in; update the async_stream call
site in streaming_chat.py and ensure callers of async_stream (and any provider
implementations) receive this flag so emission of THINKING/thinking_delta is
gated by supports("extended_thinking").

---

Nitpick comments:
In `@codeframe/adapters/llm/openai.py`:
- Around line 235-248: The inline conversion block that builds kwargs["tools"]
and sets kwargs["tool_choice"] duplicates logic in _convert_tools; replace the
inline conversion with a call to the existing _convert_tools(...) helper
(passing the tools list and any expected params) and assign its return to
kwargs["tools"], then set kwargs["tool_choice"] = "auto" only if _convert_tools
returns a non-empty list; update _convert_tools signature if needed to accept
the current Tool shape so the conversion is centralized and consistent with the
rest of the module.
- Around line 305-315: The loop that builds tool_inputs_by_id currently swallows
JSONDecodeError and silently uses {} for malformed tool arguments; update the
exception handler in the block iterating partial_tool_calls so that when
json.loads fails you log a warning (including the tool id tc["id"] and the raw
arguments string "".join(tc["arguments_parts")) before falling back to {}),
referencing the existing logger used in this module (e.g., module-level logger
or process_logger) and keep the existing behavior of yielding
StreamChunk(type="tool_use_stop") after each tool call.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 587f1db9-df00-40c1-bb30-77159a3c2c06

📥 Commits

Reviewing files that changed from the base of the PR and between 21785ae and b60eb7a.

📒 Files selected for processing (9)
  • codeframe/adapters/llm/__init__.py
  • codeframe/adapters/llm/anthropic.py
  • codeframe/adapters/llm/base.py
  • codeframe/adapters/llm/mock.py
  • codeframe/adapters/llm/openai.py
  • codeframe/core/adapters/streaming_chat.py
  • codeframe/ui/routers/session_chat_ws.py
  • tests/adapters/test_e2b_adapter.py
  • tests/core/test_streaming_chat.py

Comment on lines +248 to +263
final_msg = stream.get_final_message()
stop_reason = final_msg.stop_reason or "end_turn"

# Build tool_inputs_by_id from final content blocks
tool_inputs_by_id: dict = {}
if hasattr(final_msg, "content"):
for block in final_msg.content:
if getattr(block, "type", None) == "tool_use" and hasattr(block, "id"):
tool_inputs_by_id[block.id] = getattr(block, "input", {})

yield StreamChunk(
type="message_stop",
stop_reason=stop_reason,
input_tokens=final_msg.usage.input_tokens,
output_tokens=final_msg.usage.output_tokens,
tool_inputs_by_id=tool_inputs_by_id,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

In the official Anthropic Python SDK async streaming API, does AsyncMessageStream.get_final_message()need to be awaited insideasync with client.messages.stream(...) as stream?

💡 Result:

No.

AsyncMessageStream.get_final_message() is an async def, so it must be awaited, but it does not need to be awaited inside the async with client.messages.stream(...) as stream: block. In the official SDK examples, it’s awaited after the context manager, because the stream has already been fully consumed, so get_final_message() can return immediately. [1]

You only need to call/await it inside the async with block if you haven’t finished consuming the stream yet—because get_final_message() will itself wait for completion via await self.until_done(). [2]

Sources: README example usage [1], SDK implementation of AsyncMessageStream.get_final_message() [2].

[1] (github.com)
[2] (github.com)

Citations:


🏁 Script executed:

cd codeframe && git ls-files adapters/llm/anthropic.py

Repository: frankbria/codeframe

Length of output: 88


🏁 Script executed:

cat -n codeframe/adapters/llm/anthropic.py | sed -n '240,270p'

Repository: frankbria/codeframe

Length of output: 1630


🏁 Script executed:

cat -n codeframe/adapters/llm/anthropic.py | sed -n '180,250p'

Repository: frankbria/codeframe

Length of output: 3369


Await the async stream's final message before reading it.

On line 248, stream.get_final_message() is asynchronous but called without await. Without awaiting, final_msg is a coroutine object, causing the attribute accesses on lines 249, 254–256, and 261–262 to fail at runtime.

🐛 Proposed fix
-                    final_msg = stream.get_final_message()
+                    final_msg = await stream.get_final_message()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
final_msg = stream.get_final_message()
stop_reason = final_msg.stop_reason or "end_turn"
# Build tool_inputs_by_id from final content blocks
tool_inputs_by_id: dict = {}
if hasattr(final_msg, "content"):
for block in final_msg.content:
if getattr(block, "type", None) == "tool_use" and hasattr(block, "id"):
tool_inputs_by_id[block.id] = getattr(block, "input", {})
yield StreamChunk(
type="message_stop",
stop_reason=stop_reason,
input_tokens=final_msg.usage.input_tokens,
output_tokens=final_msg.usage.output_tokens,
tool_inputs_by_id=tool_inputs_by_id,
final_msg = await stream.get_final_message()
stop_reason = final_msg.stop_reason or "end_turn"
# Build tool_inputs_by_id from final content blocks
tool_inputs_by_id: dict = {}
if hasattr(final_msg, "content"):
for block in final_msg.content:
if getattr(block, "type", None) == "tool_use" and hasattr(block, "id"):
tool_inputs_by_id[block.id] = getattr(block, "input", {})
yield StreamChunk(
type="message_stop",
stop_reason=stop_reason,
input_tokens=final_msg.usage.input_tokens,
output_tokens=final_msg.usage.output_tokens,
tool_inputs_by_id=tool_inputs_by_id,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/adapters/llm/anthropic.py` around lines 248 - 263, The code calls
the async method stream.get_final_message() without awaiting it, so final_msg
becomes a coroutine and attribute access fails; update the block around
stream.get_final_message() to await the call (i.e., final_msg = await
stream.get_final_message()) before using final_msg.stop_reason,
final_msg.content, final_msg.usage, and building tool_inputs_by_id, ensuring the
function containing this logic is async or otherwise supports awaiting, and
preserve the existing StreamChunk construction that references final_msg.

Comment on lines +201 to +209
async def async_stream(
self,
messages: list[dict],
system: str,
tools: list[dict],
model: str,
max_tokens: int,
interrupt_event: Optional[asyncio.Event] = None,
) -> AsyncIterator[StreamChunk]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing error handling for OpenAI API exceptions.

Unlike async_complete (lines 194-199), this method does not catch and translate OpenAI exceptions (AuthenticationError, RateLimitError, APIConnectionError). An API failure during streaming will propagate raw OpenAI exceptions to callers.

Also, the tools parameter is typed as list[dict] while other methods use list[Tool]. This inconsistency means callers must know to pass different formats depending on the method.

Proposed fix to add error handling wrapper
     async def async_stream(
         self,
         messages: list[dict],
         system: str,
-        tools: list[dict],
+        tools: list[Tool],
         model: str,
         max_tokens: int,
         interrupt_event: Optional[asyncio.Event] = None,
     ) -> AsyncIterator[StreamChunk]:
         """Stream using OpenAI async client, yielding StreamChunk objects.

         Translates OpenAI SSE chunks into the normalized StreamChunk format.
         Tool calls are emitted as tool_use_start chunks; final inputs are
         collected and emitted in the message_stop chunk via tool_inputs_by_id.
         """
         import openai as _openai
+        from codeframe.adapters.llm.base import (
+            LLMAuthError,
+            LLMRateLimitError,
+            LLMConnectionError,
+        )

         if self._async_client is None:
             self._async_client = _openai.AsyncOpenAI(
                 api_key=self.api_key, base_url=self.base_url
             )
         # ... rest of method

-        async for chunk in await self._async_client.chat.completions.create(**kwargs):
+        try:
+            stream = await self._async_client.chat.completions.create(**kwargs)
+        except _openai.AuthenticationError as exc:
+            raise LLMAuthError(str(exc)) from exc
+        except _openai.RateLimitError as exc:
+            raise LLMRateLimitError(str(exc)) from exc
+        except _openai.APIConnectionError as exc:
+            raise LLMConnectionError(str(exc)) from exc
+
+        async for chunk in stream:
             # ... existing loop body
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/adapters/llm/openai.py` around lines 201 - 209, The async_stream
method lacks the same OpenAI exception handling and has an inconsistent tools
type: update async_stream to mirror async_complete by wrapping the streaming
call in a try/except that catches OpenAI exceptions (AuthenticationError,
RateLimitError, APIConnectionError) and rethrows/translate them the same way
async_complete does so callers receive consistent OpenAIError semantics, and
change the tools parameter type from list[dict] to list[Tool] to match other
methods (refer to async_stream and async_complete function names and the tools
param) so callers can pass the same Tool objects across methods.

- Add extended_thinking param to async_stream() across all providers;
  streaming_chat.py now passes supports("extended_thinking") flag so providers
  can gate thinking tokens without checking capability themselves
- AnthropicProvider: add betas=[interleaved-thinking] when extended_thinking=True
- OpenAIProvider: defer tool_use_start emission until both id+name are known
- OpenAIProvider: add exception handling (LLMAuthError/RateLimitError/
  LLMConnectionError) to async_stream() matching async_complete() semantics
- OpenAIProvider: extract _convert_raw_tools() helper to avoid duplicate
  tool conversion logic; add logger + warning on JSON decode failures
- streaming_chat.py: remove workspace absolute path from system prompt;
  replace with generic "current workspace" phrase
- base.py + StreamChunk docstring: document tool_use_stop ordering difference
  between Anthropic (interleaved) and OpenAI (batched at end)
- base.py: add comment explaining why async_stream is not @AbstractMethod
- streaming_chat.py: add backward-compat comment on provider=None fallback
@claude
Copy link
Copy Markdown

claude bot commented Apr 6, 2026

PR 553 Review — Streaming Layer Abstraction

This is a well-structured refactor. The StreamChunk abstraction is clean, the mock provider makes tests much less brittle, and the e2b skip fix is a good housekeeping win. A few specific items worth addressing:


Bug: dead try/except in anthropic.py (lines 217-219)

try:
    kwargs["betas"] = ["interleaved-thinking-2025-05-14"]
except Exception:  # pragma: no cover
    pass

Dict key assignment cannot raise an exception, so this try/except is unreachable dead code. The Anthropic SDK would reject an unsupported betas value when self._async_client.messages.stream(**kwargs) is called, not here. The comment says "degrade gracefully if the running SDK version doesn't recognise the param" — that intent should be caught around the async with block, or you can simply trust that the current SDK version supports the beta header. Either remove the try/except or move it to where the error would actually occur.


Interface inconsistency: async_stream() takes list[dict] while async_complete() takes list[Tool]

# base.py
async def async_stream(self, ..., tools: list[dict], ...) -> AsyncIterator[StreamChunk]: ...
async def async_complete(self, ..., tools: list[Tool], ...) -> LLMResponse: ...

This forces callers to pre-serialize tools to dicts before calling async_stream() but pass Tool objects to async_complete(). That's why _convert_raw_tools was added to openai.py — but it's a symptom of the inconsistency rather than a fix. Either normalise to list[Tool] throughout (each provider does its own conversion), or document clearly why async_stream differs. As coderabbit noted, _convert_raw_tools would become unnecessary if the signature is aligned.


System prompt: workspace path removed

The prior implementation included self._workspace_path in the system prompt for explicit path context. The new prompt says "current workspace" without specifying it. This is a reasonable security improvement (avoids leaking internal FS paths to the LLM), and the tools already scope to workspace_path at call time so relative-path tool calls still work. Just confirming this is intentional rather than an accidental deletion.


Minor: MockProvider.async_stream doesn't track extended_thinking

The mock accepts the extended_thinking flag but never records it. Consider tracking it (similar to calls) so tests can assert the flag was passed correctly — especially useful once StreamingChatAdapter derives the flag from provider.supports().


Summary

The abstraction layer is the right direction and the implementation is mostly correct. Two items worth fixing before merge:

  1. Remove or relocate the dead try/except around kwargs["betas"] — it doesn't do what the comment claims
  2. Align tools parameter type across async_stream() and async_complete() to remove _convert_raw_tools and its duplication

Everything else is nitpick-level. The migration from patched SDK internals to MockProvider is a clear improvement in test quality.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
codeframe/adapters/llm/anthropic.py (1)

261-276: ⚠️ Potential issue | 🔴 Critical

Await get_final_message() before reading stop_reason or usage.

Line 261 currently stores a coroutine, so the attribute reads below will fail at runtime. Anthropic's own async streaming example awaits stream.get_final_message(), and this was already called out in an earlier review pass. (github.com)

🐛 Proposed fix
-                    final_msg = stream.get_final_message()
+                    final_msg = await stream.get_final_message()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/adapters/llm/anthropic.py` around lines 261 - 276, The code assigns
stream.get_final_message() to final_msg without awaiting, so later attribute
access (final_msg.stop_reason, final_msg.usage, final_msg.content) will fail;
update the block in the stream handling (where final_msg is set) to await
stream.get_final_message() (e.g., final_msg = await stream.get_final_message()),
then use final_msg.stop_reason, final_msg.usage.input_tokens/output_tokens, and
iterate final_msg.content to build tool_inputs_by_id as currently written.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@codeframe/adapters/llm/anthropic.py`:
- Around line 206-212: async_stream is forwarding raw message dicts while
complete() and async_complete() use _convert_messages(messages); update
async_stream to normalize messages by calling _convert_messages(messages) before
building/forwarding kwargs so the same tool_calls/tool_results payloads work for
streaming; locate async_stream and replace or augment the messages usage with
converted = self._convert_messages(messages) (or the equivalent method name) and
pass that converted variable into kwargs instead of the original messages.
- Around line 177-179: The try-except currently wraps only the dict assignment
kwargs["betas"] = ..., but the API error will happen on the call to
messages.stream(**kwargs); move the try-except so it encloses
messages.stream(**kwargs) instead of the dict assignment. Specifically, keep
setting kwargs["betas"] when extended_thinking is true, but call
messages.stream(**kwargs) inside a try block and catch the API error that
indicates unsupported betas (or a generic request/HTTP exception), then degrade
by removing or ignoring kwargs["betas"] and retrying or falling back to non-beta
streaming so extended_thinking silently degrades as supports() promises. Ensure
you reference the same variables and methods: kwargs, "betas", messages.stream,
and the extended_thinking handling.

In `@codeframe/adapters/llm/mock.py`:
- Around line 190-223: The default branch of async_stream ignores the provider's
response queue and logging; modify MockProvider.async_stream so when no
preconfigured stream_chunks are present it (1) increments self.call_count and
sets self.last_call with the same call metadata used by stream()/complete(), (2)
consumes from self.responses (or uses self.default_response) and invokes
self.response_handler to generate the StreamChunk sequence instead of hardcoding
chunks, and (3) yields those chunks while still honoring interrupt_event;
reference async_stream, response_handler, self.responses, self.default_response,
self.call_count and self.last_call to locate and update the logic.

---

Duplicate comments:
In `@codeframe/adapters/llm/anthropic.py`:
- Around line 261-276: The code assigns stream.get_final_message() to final_msg
without awaiting, so later attribute access (final_msg.stop_reason,
final_msg.usage, final_msg.content) will fail; update the block in the stream
handling (where final_msg is set) to await stream.get_final_message() (e.g.,
final_msg = await stream.get_final_message()), then use final_msg.stop_reason,
final_msg.usage.input_tokens/output_tokens, and iterate final_msg.content to
build tool_inputs_by_id as currently written.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f5c1219a-b00b-4ab5-911f-ca2cae7fd7d2

📥 Commits

Reviewing files that changed from the base of the PR and between b60eb7a and a57085f.

📒 Files selected for processing (5)
  • codeframe/adapters/llm/anthropic.py
  • codeframe/adapters/llm/base.py
  • codeframe/adapters/llm/mock.py
  • codeframe/adapters/llm/openai.py
  • codeframe/core/adapters/streaming_chat.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • codeframe/adapters/llm/openai.py

Comment on lines +177 to +179
def supports(self, capability: str) -> bool:
"""Return True for capabilities this provider supports."""
return capability == "extended_thinking"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n codeframe/adapters/llm/anthropic.py | sed -n '170,230p'

Repository: frankbria/codeframe

Length of output: 2756


🏁 Script executed:

cat -n codeframe/adapters/llm/anthropic.py | sed -n '100,220p'

Repository: frankbria/codeframe

Length of output: 4957


🏁 Script executed:

cat -n codeframe/adapters/llm/anthropic.py | grep -A 30 "def stream"

Repository: frankbria/codeframe

Length of output: 1156


🏁 Script executed:

cat -n codeframe/adapters/llm/anthropic.py | sed -n '279,340p'

Repository: frankbria/codeframe

Length of output: 2665


🏁 Script executed:

cat -n codeframe/adapters/llm/anthropic.py | head -100

Repository: frankbria/codeframe

Length of output: 3760


Move the try-except wrapping to the actual API call, not the dict assignment.

The try-except at lines 217–220 wraps only kwargs["betas"] = ..., a dict assignment that cannot fail. If Anthropic's API rejects the unsupported-beta header, the error occurs at line 224 during messages.stream(**kwargs), which is outside the try-except. This means when extended_thinking=True is used but the API doesn't support the beta feature, an unhandled exception will propagate instead of silently degrading as the docstring claims and as supports() implies.

Wrap the actual messages.stream() call with the try-except to catch unsupported-beta errors from the API.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/adapters/llm/anthropic.py` around lines 177 - 179, The try-except
currently wraps only the dict assignment kwargs["betas"] = ..., but the API
error will happen on the call to messages.stream(**kwargs); move the try-except
so it encloses messages.stream(**kwargs) instead of the dict assignment.
Specifically, keep setting kwargs["betas"] when extended_thinking is true, but
call messages.stream(**kwargs) inside a try block and catch the API error that
indicates unsupported betas (or a generic request/HTTP exception), then degrade
by removing or ignoring kwargs["betas"] and retrying or falling back to non-beta
streaming so extended_thinking silently degrades as supports() promises. Ensure
you reference the same variables and methods: kwargs, "betas", messages.stream,
and the extended_thinking handling.

Comment on lines +206 to +212
kwargs: dict = {
"model": model,
"system": system,
"messages": messages,
"tools": tools,
"max_tokens": max_tokens,
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Convert streamed messages the same way as completions.

complete() and async_complete() both normalize through _convert_messages(messages), but async_stream() forwards the raw dicts. The same tool_calls / tool_results payload that works in completion mode can therefore fail once callers switch to streaming.

🐛 Proposed fix
         kwargs: dict = {
             "model": model,
             "system": system,
-            "messages": messages,
+            "messages": self._convert_messages(messages),
             "tools": tools,
             "max_tokens": max_tokens,
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
kwargs: dict = {
"model": model,
"system": system,
"messages": messages,
"tools": tools,
"max_tokens": max_tokens,
}
kwargs: dict = {
"model": model,
"system": system,
"messages": self._convert_messages(messages),
"tools": tools,
"max_tokens": max_tokens,
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/adapters/llm/anthropic.py` around lines 206 - 212, async_stream is
forwarding raw message dicts while complete() and async_complete() use
_convert_messages(messages); update async_stream to normalize messages by
calling _convert_messages(messages) before building/forwarding kwargs so the
same tool_calls/tool_results payloads work for streaming; locate async_stream
and replace or augment the messages usage with converted =
self._convert_messages(messages) (or the equivalent method name) and pass that
converted variable into kwargs instead of the original messages.

Comment on lines +190 to +223
async def async_stream(
self,
messages: list[dict],
system: str,
tools: list[dict],
model: str,
max_tokens: int,
interrupt_event: Optional[asyncio.Event] = None,
extended_thinking: bool = False,
) -> AsyncIterator[StreamChunk]:
"""Yield pre-configured StreamChunk sequences for testing.

Falls back to a minimal text_delta + message_stop pair when no
stream chunks have been configured via add_stream_chunks().
"""
if self.stream_index < len(self.stream_chunks):
chunks = self.stream_chunks[self.stream_index]
self.stream_index += 1
else:
# Default: simple text response followed by message_stop
chunks = [
StreamChunk(type="text_delta", text=self.default_response),
StreamChunk(
type="message_stop",
stop_reason="end_turn",
input_tokens=len(str(messages)),
output_tokens=len(self.default_response),
tool_inputs_by_id={},
),
]
for chunk in chunks:
if interrupt_event and interrupt_event.is_set():
return
yield chunk
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Keep async_stream() aligned with the rest of MockProvider.

When no explicit stream_chunks are queued, this path ignores response_handler and queued responses, and it never records the call. Tests that switch from complete()/stream() to async_stream() will silently get different behavior, and call_count / last_call stop reflecting streaming usage.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/adapters/llm/mock.py` around lines 190 - 223, The default branch of
async_stream ignores the provider's response queue and logging; modify
MockProvider.async_stream so when no preconfigured stream_chunks are present it
(1) increments self.call_count and sets self.last_call with the same call
metadata used by stream()/complete(), (2) consumes from self.responses (or uses
self.default_response) and invokes self.response_handler to generate the
StreamChunk sequence instead of hardcoding chunks, and (3) yields those chunks
while still honoring interrupt_event; reference async_stream, response_handler,
self.responses, self.default_response, self.call_count and self.last_call to
locate and update the logic.

…tracking

- anthropic.py: await stream.get_final_message() — it IS async in the SDK
  (get_final_message is a coroutine; the test mock masked this)
- anthropic.py: call _convert_messages() before streaming to normalize
  tool_calls/tool_results payload format
- anthropic.py: move extended_thinking betas retry to stream() call site
  rather than the dict assignment
- mock.py: async_stream() now tracks calls in self.calls (same metadata
  as complete()) and derives default chunks from response queue /
  response_handler / default_response
- tests/adapters/test_llm_async.py: add 5 new async_stream() lifecycle tests
  covering default chunks, preconfigured chunks, call tracking, interrupt, and
  extended_thinking flag propagation
@claude
Copy link
Copy Markdown

claude bot commented Apr 7, 2026

Follow-up Review

All three issues flagged in my previous review are resolved in the latest commit:

✅ Fixed

  • await stream.get_final_message()await is now present (CodeRabbit's latest "critical" duplicate comment is stale; the current diff already has the fix)
  • Dead try/except relocated — it now wraps self._async_client.messages.stream(**kwargs), which is where the SDK would actually raise if the betas param is unsupported
  • _convert_messages() now called in async_stream() before building kwargs — tool_calls/tool_results in conversation history will serialize correctly
  • MockProvider.async_stream tracks extended_thinking in calls — tests can assert on it

Remaining: tools type inconsistency (design debt, not a blocker)

async_stream(tools: list[dict]) vs async_complete(tools: list[Tool]) still diverge, which is why _convert_raw_tools() exists in openai.py. This isn't a merge blocker but worth a follow-up issue to unify — the right direction is list[Tool] everywhere with each provider doing its own serialization.

One minor observation

In MockProvider.async_stream, the response_handler fallback branch doesn't increment self.response_index, which is consistent with how response_handler works in complete() (handler is stateless by design). Fine as-is.

Overall: the abstraction is sound, the three critical fixes landed cleanly, and the test migration from patched SDK internals to MockProvider is a clear improvement. Ready to merge once the tools-type inconsistency is tracked as a follow-up.

@frankbria frankbria merged commit 843de55 into main Apr 7, 2026
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[4.D.6] Abstract streaming layer (streaming_chat.py) to use LLMProvider

1 participant