feat(adapters): abstract streaming layer behind LLMProvider (#548)#553
feat(adapters): abstract streaming layer behind LLMProvider (#548)#553
Conversation
- Add StreamChunk dataclass to base.py as normalized streaming type
- Add async_stream() + supports() to LLMProvider ABC
- Implement async_stream() in AnthropicProvider (moves streaming out of
streaming_chat.py; supports("extended_thinking") returns True)
- Implement async_stream() in OpenAIProvider (SSE → StreamChunk)
- Add async_stream() to MockProvider with configurable StreamChunk sequences
- Refactor StreamingChatAdapter to accept LLMProvider; no import anthropic
- Update session_chat_ws.py to construct AnthropicProvider explicitly
- Export StreamChunk from adapters.llm.__init__
- Update streaming chat tests to use MockProvider instead of _async_client patch
- Fix test_e2b_adapter.py to skip TestE2BAgentAdapter when e2b not installed
WalkthroughA provider-agnostic async streaming abstraction was added: Changes
Sequence DiagramsequenceDiagram
participant Adapter as StreamingChatAdapter
participant Provider as LLMProvider<br/>(Anthropic/OpenAI/Mock)
participant API as External API<br/>(Anthropic/OpenAI)
Adapter->>Provider: async_stream(messages, system, tools, ...)
Provider->>API: open/stream request (provider-specific)
loop stream events
API-->>Provider: provider event / delta
Provider->>Provider: normalize -> StreamChunk
Provider-->>Adapter: emit StreamChunk
Adapter->>Adapter: handle chunk (text/thinking/tool/cost)
end
API-->>Provider: completion / final message
Provider->>Adapter: emit final message_stop StreamChunk (tokens, tool_inputs_by_id)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
Review: feat(adapters): abstract streaming layer behind LLMProvider (#548)This is a solid architectural improvement — Issues worth addressing1. In The current consumer in 2. # session_chat_ws.py
provider = AnthropicProvider()
adapter = StreamingChatAdapter(..., provider=provider)The explicit construction here is correct (composition root pattern). But 3. The Minor nits
What's working well
The |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (2)
codeframe/adapters/llm/openai.py (2)
235-248: Duplicate tool conversion logic.This inline tool conversion duplicates the logic in
_convert_tools(lines 401-413). Consider reusing the existing method for consistency and maintainability.Proposed refactor to reuse _convert_tools
If the signature is updated to accept
list[Tool]:# Convert tools from Anthropic-style (input_schema) to OpenAI format if tools: - kwargs["tools"] = [ - { - "type": "function", - "function": { - "name": t["name"], - "description": t.get("description", ""), - "parameters": t.get("input_schema", {}), - }, - } - for t in tools - ] + kwargs["tools"] = self._convert_tools(tools) kwargs["tool_choice"] = "auto"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@codeframe/adapters/llm/openai.py` around lines 235 - 248, The inline conversion block that builds kwargs["tools"] and sets kwargs["tool_choice"] duplicates logic in _convert_tools; replace the inline conversion with a call to the existing _convert_tools(...) helper (passing the tools list and any expected params) and assign its return to kwargs["tools"], then set kwargs["tool_choice"] = "auto" only if _convert_tools returns a non-empty list; update _convert_tools signature if needed to accept the current Tool shape so the conversion is centralized and consistent with the rest of the module.
305-315: Silent fallback on JSON decode errors may hide malformed tool arguments.When
json.loadsfails, the code silently defaults to{}(line 313). This could mask issues where the LLM produces malformed JSON, making debugging difficult. Consider logging a warning when this occurs.Proposed improvement to add logging
+import logging + +logger = logging.getLogger(__name__) + # Build tool_inputs_by_id from accumulated partial tool calls tool_inputs_by_id: dict = {} for tc in partial_tool_calls.values(): try: tool_inputs_by_id[tc["id"]] = json.loads( "".join(tc["arguments_parts"]) or "{}" ) except json.JSONDecodeError: + logger.warning( + "Failed to parse tool arguments for tool %s: %s", + tc["name"], + "".join(tc["arguments_parts"]), + ) tool_inputs_by_id[tc["id"]] = {}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@codeframe/adapters/llm/openai.py` around lines 305 - 315, The loop that builds tool_inputs_by_id currently swallows JSONDecodeError and silently uses {} for malformed tool arguments; update the exception handler in the block iterating partial_tool_calls so that when json.loads fails you log a warning (including the tool id tc["id"] and the raw arguments string "".join(tc["arguments_parts")) before falling back to {}), referencing the existing logger used in this module (e.g., module-level logger or process_logger) and keep the existing behavior of yielding StreamChunk(type="tool_use_stop") after each tool call.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@codeframe/adapters/llm/anthropic.py`:
- Around line 248-263: The code calls the async method
stream.get_final_message() without awaiting it, so final_msg becomes a coroutine
and attribute access fails; update the block around stream.get_final_message()
to await the call (i.e., final_msg = await stream.get_final_message()) before
using final_msg.stop_reason, final_msg.content, final_msg.usage, and building
tool_inputs_by_id, ensuring the function containing this logic is async or
otherwise supports awaiting, and preserve the existing StreamChunk construction
that references final_msg.
In `@codeframe/adapters/llm/openai.py`:
- Around line 281-293: The early emission of StreamChunk(type="tool_use_start")
in the partial_tool_calls creation may send empty tool_id/tool_name because
tc_delta.id or tc_delta.function.name can be None; change the logic in the block
that creates partial_tool_calls (where partial_tool_calls[idx] is initialized
from tc_delta) to defer emitting the "tool_use_start" StreamChunk until both
tc_delta.id and tc_delta.function.name are non-empty (or record the partial
entry without emitting and emit the StreamChunk later when a subsequent tc_delta
provides the missing id/name), and ensure updates to the existing
partial_tool_calls entry trigger a single emission of
StreamChunk(type="tool_use_start") with populated tool_id and tool_name instead
of emitting early with empty strings.
- Around line 201-209: The async_stream method lacks the same OpenAI exception
handling and has an inconsistent tools type: update async_stream to mirror
async_complete by wrapping the streaming call in a try/except that catches
OpenAI exceptions (AuthenticationError, RateLimitError, APIConnectionError) and
rethrows/translate them the same way async_complete does so callers receive
consistent OpenAIError semantics, and change the tools parameter type from
list[dict] to list[Tool] to match other methods (refer to async_stream and
async_complete function names and the tools param) so callers can pass the same
Tool objects across methods.
In `@codeframe/core/adapters/streaming_chat.py`:
- Around line 352-357: The system_prompt currently interpolates
self._workspace_path and leaks an absolute path to the LLM; update the prompt
construction in the code where system_prompt is defined (in streaming_chat.py,
the variable system_prompt inside the relevant class/method) to remove
{self._workspace_path} and use a neutral phrase such as "the current workspace"
(e.g., replace the f-string that includes self._workspace_path with a static
string mentioning "the current workspace") so the model retains context without
exposing internal filesystem layout.
- Around line 363-370: Compute a boolean by calling
supports("extended_thinking") and pass it into the provider stream call (the
async_stream(...) invocation) as an explicit keyword option (e.g.,
extended_thinking=extended_thinking) so the provider can emit
thinking_delta/THINKING events only when opted-in; update the async_stream call
site in streaming_chat.py and ensure callers of async_stream (and any provider
implementations) receive this flag so emission of THINKING/thinking_delta is
gated by supports("extended_thinking").
---
Nitpick comments:
In `@codeframe/adapters/llm/openai.py`:
- Around line 235-248: The inline conversion block that builds kwargs["tools"]
and sets kwargs["tool_choice"] duplicates logic in _convert_tools; replace the
inline conversion with a call to the existing _convert_tools(...) helper
(passing the tools list and any expected params) and assign its return to
kwargs["tools"], then set kwargs["tool_choice"] = "auto" only if _convert_tools
returns a non-empty list; update _convert_tools signature if needed to accept
the current Tool shape so the conversion is centralized and consistent with the
rest of the module.
- Around line 305-315: The loop that builds tool_inputs_by_id currently swallows
JSONDecodeError and silently uses {} for malformed tool arguments; update the
exception handler in the block iterating partial_tool_calls so that when
json.loads fails you log a warning (including the tool id tc["id"] and the raw
arguments string "".join(tc["arguments_parts")) before falling back to {}),
referencing the existing logger used in this module (e.g., module-level logger
or process_logger) and keep the existing behavior of yielding
StreamChunk(type="tool_use_stop") after each tool call.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 587f1db9-df00-40c1-bb30-77159a3c2c06
📒 Files selected for processing (9)
codeframe/adapters/llm/__init__.pycodeframe/adapters/llm/anthropic.pycodeframe/adapters/llm/base.pycodeframe/adapters/llm/mock.pycodeframe/adapters/llm/openai.pycodeframe/core/adapters/streaming_chat.pycodeframe/ui/routers/session_chat_ws.pytests/adapters/test_e2b_adapter.pytests/core/test_streaming_chat.py
codeframe/adapters/llm/anthropic.py
Outdated
| final_msg = stream.get_final_message() | ||
| stop_reason = final_msg.stop_reason or "end_turn" | ||
|
|
||
| # Build tool_inputs_by_id from final content blocks | ||
| tool_inputs_by_id: dict = {} | ||
| if hasattr(final_msg, "content"): | ||
| for block in final_msg.content: | ||
| if getattr(block, "type", None) == "tool_use" and hasattr(block, "id"): | ||
| tool_inputs_by_id[block.id] = getattr(block, "input", {}) | ||
|
|
||
| yield StreamChunk( | ||
| type="message_stop", | ||
| stop_reason=stop_reason, | ||
| input_tokens=final_msg.usage.input_tokens, | ||
| output_tokens=final_msg.usage.output_tokens, | ||
| tool_inputs_by_id=tool_inputs_by_id, |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In the official Anthropic Python SDK async streaming API, does AsyncMessageStream.get_final_message()need to be awaited insideasync with client.messages.stream(...) as stream?
💡 Result:
No.
AsyncMessageStream.get_final_message() is an async def, so it must be awaited, but it does not need to be awaited inside the async with client.messages.stream(...) as stream: block. In the official SDK examples, it’s awaited after the context manager, because the stream has already been fully consumed, so get_final_message() can return immediately. [1]
You only need to call/await it inside the async with block if you haven’t finished consuming the stream yet—because get_final_message() will itself wait for completion via await self.until_done(). [2]
Sources: README example usage [1], SDK implementation of AsyncMessageStream.get_final_message() [2].
[1] (github.com)
[2] (github.com)
Citations:
- 1: https://github.com/anthropics/anthropic-sdk-python?utm_source=openai
- 2: https://github.com/anthropics/anthropic-sdk-python/blob/main/src/anthropic/lib/streaming/_messages.py
🏁 Script executed:
cd codeframe && git ls-files adapters/llm/anthropic.pyRepository: frankbria/codeframe
Length of output: 88
🏁 Script executed:
cat -n codeframe/adapters/llm/anthropic.py | sed -n '240,270p'Repository: frankbria/codeframe
Length of output: 1630
🏁 Script executed:
cat -n codeframe/adapters/llm/anthropic.py | sed -n '180,250p'Repository: frankbria/codeframe
Length of output: 3369
Await the async stream's final message before reading it.
On line 248, stream.get_final_message() is asynchronous but called without await. Without awaiting, final_msg is a coroutine object, causing the attribute accesses on lines 249, 254–256, and 261–262 to fail at runtime.
🐛 Proposed fix
- final_msg = stream.get_final_message()
+ final_msg = await stream.get_final_message()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| final_msg = stream.get_final_message() | |
| stop_reason = final_msg.stop_reason or "end_turn" | |
| # Build tool_inputs_by_id from final content blocks | |
| tool_inputs_by_id: dict = {} | |
| if hasattr(final_msg, "content"): | |
| for block in final_msg.content: | |
| if getattr(block, "type", None) == "tool_use" and hasattr(block, "id"): | |
| tool_inputs_by_id[block.id] = getattr(block, "input", {}) | |
| yield StreamChunk( | |
| type="message_stop", | |
| stop_reason=stop_reason, | |
| input_tokens=final_msg.usage.input_tokens, | |
| output_tokens=final_msg.usage.output_tokens, | |
| tool_inputs_by_id=tool_inputs_by_id, | |
| final_msg = await stream.get_final_message() | |
| stop_reason = final_msg.stop_reason or "end_turn" | |
| # Build tool_inputs_by_id from final content blocks | |
| tool_inputs_by_id: dict = {} | |
| if hasattr(final_msg, "content"): | |
| for block in final_msg.content: | |
| if getattr(block, "type", None) == "tool_use" and hasattr(block, "id"): | |
| tool_inputs_by_id[block.id] = getattr(block, "input", {}) | |
| yield StreamChunk( | |
| type="message_stop", | |
| stop_reason=stop_reason, | |
| input_tokens=final_msg.usage.input_tokens, | |
| output_tokens=final_msg.usage.output_tokens, | |
| tool_inputs_by_id=tool_inputs_by_id, |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@codeframe/adapters/llm/anthropic.py` around lines 248 - 263, The code calls
the async method stream.get_final_message() without awaiting it, so final_msg
becomes a coroutine and attribute access fails; update the block around
stream.get_final_message() to await the call (i.e., final_msg = await
stream.get_final_message()) before using final_msg.stop_reason,
final_msg.content, final_msg.usage, and building tool_inputs_by_id, ensuring the
function containing this logic is async or otherwise supports awaiting, and
preserve the existing StreamChunk construction that references final_msg.
| async def async_stream( | ||
| self, | ||
| messages: list[dict], | ||
| system: str, | ||
| tools: list[dict], | ||
| model: str, | ||
| max_tokens: int, | ||
| interrupt_event: Optional[asyncio.Event] = None, | ||
| ) -> AsyncIterator[StreamChunk]: |
There was a problem hiding this comment.
Missing error handling for OpenAI API exceptions.
Unlike async_complete (lines 194-199), this method does not catch and translate OpenAI exceptions (AuthenticationError, RateLimitError, APIConnectionError). An API failure during streaming will propagate raw OpenAI exceptions to callers.
Also, the tools parameter is typed as list[dict] while other methods use list[Tool]. This inconsistency means callers must know to pass different formats depending on the method.
Proposed fix to add error handling wrapper
async def async_stream(
self,
messages: list[dict],
system: str,
- tools: list[dict],
+ tools: list[Tool],
model: str,
max_tokens: int,
interrupt_event: Optional[asyncio.Event] = None,
) -> AsyncIterator[StreamChunk]:
"""Stream using OpenAI async client, yielding StreamChunk objects.
Translates OpenAI SSE chunks into the normalized StreamChunk format.
Tool calls are emitted as tool_use_start chunks; final inputs are
collected and emitted in the message_stop chunk via tool_inputs_by_id.
"""
import openai as _openai
+ from codeframe.adapters.llm.base import (
+ LLMAuthError,
+ LLMRateLimitError,
+ LLMConnectionError,
+ )
if self._async_client is None:
self._async_client = _openai.AsyncOpenAI(
api_key=self.api_key, base_url=self.base_url
)
# ... rest of method
- async for chunk in await self._async_client.chat.completions.create(**kwargs):
+ try:
+ stream = await self._async_client.chat.completions.create(**kwargs)
+ except _openai.AuthenticationError as exc:
+ raise LLMAuthError(str(exc)) from exc
+ except _openai.RateLimitError as exc:
+ raise LLMRateLimitError(str(exc)) from exc
+ except _openai.APIConnectionError as exc:
+ raise LLMConnectionError(str(exc)) from exc
+
+ async for chunk in stream:
# ... existing loop body🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@codeframe/adapters/llm/openai.py` around lines 201 - 209, The async_stream
method lacks the same OpenAI exception handling and has an inconsistent tools
type: update async_stream to mirror async_complete by wrapping the streaming
call in a try/except that catches OpenAI exceptions (AuthenticationError,
RateLimitError, APIConnectionError) and rethrows/translate them the same way
async_complete does so callers receive consistent OpenAIError semantics, and
change the tools parameter type from list[dict] to list[Tool] to match other
methods (refer to async_stream and async_complete function names and the tools
param) so callers can pass the same Tool objects across methods.
- Add extended_thinking param to async_stream() across all providers;
streaming_chat.py now passes supports("extended_thinking") flag so providers
can gate thinking tokens without checking capability themselves
- AnthropicProvider: add betas=[interleaved-thinking] when extended_thinking=True
- OpenAIProvider: defer tool_use_start emission until both id+name are known
- OpenAIProvider: add exception handling (LLMAuthError/RateLimitError/
LLMConnectionError) to async_stream() matching async_complete() semantics
- OpenAIProvider: extract _convert_raw_tools() helper to avoid duplicate
tool conversion logic; add logger + warning on JSON decode failures
- streaming_chat.py: remove workspace absolute path from system prompt;
replace with generic "current workspace" phrase
- base.py + StreamChunk docstring: document tool_use_stop ordering difference
between Anthropic (interleaved) and OpenAI (batched at end)
- base.py: add comment explaining why async_stream is not @AbstractMethod
- streaming_chat.py: add backward-compat comment on provider=None fallback
PR 553 Review — Streaming Layer AbstractionThis is a well-structured refactor. The Bug: dead
|
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
codeframe/adapters/llm/anthropic.py (1)
261-276:⚠️ Potential issue | 🔴 CriticalAwait
get_final_message()before readingstop_reasonorusage.Line 261 currently stores a coroutine, so the attribute reads below will fail at runtime. Anthropic's own async streaming example awaits
stream.get_final_message(), and this was already called out in an earlier review pass. (github.com)🐛 Proposed fix
- final_msg = stream.get_final_message() + final_msg = await stream.get_final_message()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@codeframe/adapters/llm/anthropic.py` around lines 261 - 276, The code assigns stream.get_final_message() to final_msg without awaiting, so later attribute access (final_msg.stop_reason, final_msg.usage, final_msg.content) will fail; update the block in the stream handling (where final_msg is set) to await stream.get_final_message() (e.g., final_msg = await stream.get_final_message()), then use final_msg.stop_reason, final_msg.usage.input_tokens/output_tokens, and iterate final_msg.content to build tool_inputs_by_id as currently written.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@codeframe/adapters/llm/anthropic.py`:
- Around line 206-212: async_stream is forwarding raw message dicts while
complete() and async_complete() use _convert_messages(messages); update
async_stream to normalize messages by calling _convert_messages(messages) before
building/forwarding kwargs so the same tool_calls/tool_results payloads work for
streaming; locate async_stream and replace or augment the messages usage with
converted = self._convert_messages(messages) (or the equivalent method name) and
pass that converted variable into kwargs instead of the original messages.
- Around line 177-179: The try-except currently wraps only the dict assignment
kwargs["betas"] = ..., but the API error will happen on the call to
messages.stream(**kwargs); move the try-except so it encloses
messages.stream(**kwargs) instead of the dict assignment. Specifically, keep
setting kwargs["betas"] when extended_thinking is true, but call
messages.stream(**kwargs) inside a try block and catch the API error that
indicates unsupported betas (or a generic request/HTTP exception), then degrade
by removing or ignoring kwargs["betas"] and retrying or falling back to non-beta
streaming so extended_thinking silently degrades as supports() promises. Ensure
you reference the same variables and methods: kwargs, "betas", messages.stream,
and the extended_thinking handling.
In `@codeframe/adapters/llm/mock.py`:
- Around line 190-223: The default branch of async_stream ignores the provider's
response queue and logging; modify MockProvider.async_stream so when no
preconfigured stream_chunks are present it (1) increments self.call_count and
sets self.last_call with the same call metadata used by stream()/complete(), (2)
consumes from self.responses (or uses self.default_response) and invokes
self.response_handler to generate the StreamChunk sequence instead of hardcoding
chunks, and (3) yields those chunks while still honoring interrupt_event;
reference async_stream, response_handler, self.responses, self.default_response,
self.call_count and self.last_call to locate and update the logic.
---
Duplicate comments:
In `@codeframe/adapters/llm/anthropic.py`:
- Around line 261-276: The code assigns stream.get_final_message() to final_msg
without awaiting, so later attribute access (final_msg.stop_reason,
final_msg.usage, final_msg.content) will fail; update the block in the stream
handling (where final_msg is set) to await stream.get_final_message() (e.g.,
final_msg = await stream.get_final_message()), then use final_msg.stop_reason,
final_msg.usage.input_tokens/output_tokens, and iterate final_msg.content to
build tool_inputs_by_id as currently written.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f5c1219a-b00b-4ab5-911f-ca2cae7fd7d2
📒 Files selected for processing (5)
codeframe/adapters/llm/anthropic.pycodeframe/adapters/llm/base.pycodeframe/adapters/llm/mock.pycodeframe/adapters/llm/openai.pycodeframe/core/adapters/streaming_chat.py
🚧 Files skipped from review as they are similar to previous changes (1)
- codeframe/adapters/llm/openai.py
| def supports(self, capability: str) -> bool: | ||
| """Return True for capabilities this provider supports.""" | ||
| return capability == "extended_thinking" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n codeframe/adapters/llm/anthropic.py | sed -n '170,230p'Repository: frankbria/codeframe
Length of output: 2756
🏁 Script executed:
cat -n codeframe/adapters/llm/anthropic.py | sed -n '100,220p'Repository: frankbria/codeframe
Length of output: 4957
🏁 Script executed:
cat -n codeframe/adapters/llm/anthropic.py | grep -A 30 "def stream"Repository: frankbria/codeframe
Length of output: 1156
🏁 Script executed:
cat -n codeframe/adapters/llm/anthropic.py | sed -n '279,340p'Repository: frankbria/codeframe
Length of output: 2665
🏁 Script executed:
cat -n codeframe/adapters/llm/anthropic.py | head -100Repository: frankbria/codeframe
Length of output: 3760
Move the try-except wrapping to the actual API call, not the dict assignment.
The try-except at lines 217–220 wraps only kwargs["betas"] = ..., a dict assignment that cannot fail. If Anthropic's API rejects the unsupported-beta header, the error occurs at line 224 during messages.stream(**kwargs), which is outside the try-except. This means when extended_thinking=True is used but the API doesn't support the beta feature, an unhandled exception will propagate instead of silently degrading as the docstring claims and as supports() implies.
Wrap the actual messages.stream() call with the try-except to catch unsupported-beta errors from the API.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@codeframe/adapters/llm/anthropic.py` around lines 177 - 179, The try-except
currently wraps only the dict assignment kwargs["betas"] = ..., but the API
error will happen on the call to messages.stream(**kwargs); move the try-except
so it encloses messages.stream(**kwargs) instead of the dict assignment.
Specifically, keep setting kwargs["betas"] when extended_thinking is true, but
call messages.stream(**kwargs) inside a try block and catch the API error that
indicates unsupported betas (or a generic request/HTTP exception), then degrade
by removing or ignoring kwargs["betas"] and retrying or falling back to non-beta
streaming so extended_thinking silently degrades as supports() promises. Ensure
you reference the same variables and methods: kwargs, "betas", messages.stream,
and the extended_thinking handling.
| kwargs: dict = { | ||
| "model": model, | ||
| "system": system, | ||
| "messages": messages, | ||
| "tools": tools, | ||
| "max_tokens": max_tokens, | ||
| } |
There was a problem hiding this comment.
Convert streamed messages the same way as completions.
complete() and async_complete() both normalize through _convert_messages(messages), but async_stream() forwards the raw dicts. The same tool_calls / tool_results payload that works in completion mode can therefore fail once callers switch to streaming.
🐛 Proposed fix
kwargs: dict = {
"model": model,
"system": system,
- "messages": messages,
+ "messages": self._convert_messages(messages),
"tools": tools,
"max_tokens": max_tokens,
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| kwargs: dict = { | |
| "model": model, | |
| "system": system, | |
| "messages": messages, | |
| "tools": tools, | |
| "max_tokens": max_tokens, | |
| } | |
| kwargs: dict = { | |
| "model": model, | |
| "system": system, | |
| "messages": self._convert_messages(messages), | |
| "tools": tools, | |
| "max_tokens": max_tokens, | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@codeframe/adapters/llm/anthropic.py` around lines 206 - 212, async_stream is
forwarding raw message dicts while complete() and async_complete() use
_convert_messages(messages); update async_stream to normalize messages by
calling _convert_messages(messages) before building/forwarding kwargs so the
same tool_calls/tool_results payloads work for streaming; locate async_stream
and replace or augment the messages usage with converted =
self._convert_messages(messages) (or the equivalent method name) and pass that
converted variable into kwargs instead of the original messages.
| async def async_stream( | ||
| self, | ||
| messages: list[dict], | ||
| system: str, | ||
| tools: list[dict], | ||
| model: str, | ||
| max_tokens: int, | ||
| interrupt_event: Optional[asyncio.Event] = None, | ||
| extended_thinking: bool = False, | ||
| ) -> AsyncIterator[StreamChunk]: | ||
| """Yield pre-configured StreamChunk sequences for testing. | ||
|
|
||
| Falls back to a minimal text_delta + message_stop pair when no | ||
| stream chunks have been configured via add_stream_chunks(). | ||
| """ | ||
| if self.stream_index < len(self.stream_chunks): | ||
| chunks = self.stream_chunks[self.stream_index] | ||
| self.stream_index += 1 | ||
| else: | ||
| # Default: simple text response followed by message_stop | ||
| chunks = [ | ||
| StreamChunk(type="text_delta", text=self.default_response), | ||
| StreamChunk( | ||
| type="message_stop", | ||
| stop_reason="end_turn", | ||
| input_tokens=len(str(messages)), | ||
| output_tokens=len(self.default_response), | ||
| tool_inputs_by_id={}, | ||
| ), | ||
| ] | ||
| for chunk in chunks: | ||
| if interrupt_event and interrupt_event.is_set(): | ||
| return | ||
| yield chunk |
There was a problem hiding this comment.
Keep async_stream() aligned with the rest of MockProvider.
When no explicit stream_chunks are queued, this path ignores response_handler and queued responses, and it never records the call. Tests that switch from complete()/stream() to async_stream() will silently get different behavior, and call_count / last_call stop reflecting streaming usage.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@codeframe/adapters/llm/mock.py` around lines 190 - 223, The default branch of
async_stream ignores the provider's response queue and logging; modify
MockProvider.async_stream so when no preconfigured stream_chunks are present it
(1) increments self.call_count and sets self.last_call with the same call
metadata used by stream()/complete(), (2) consumes from self.responses (or uses
self.default_response) and invokes self.response_handler to generate the
StreamChunk sequence instead of hardcoding chunks, and (3) yields those chunks
while still honoring interrupt_event; reference async_stream, response_handler,
self.responses, self.default_response, self.call_count and self.last_call to
locate and update the logic.
…tracking - anthropic.py: await stream.get_final_message() — it IS async in the SDK (get_final_message is a coroutine; the test mock masked this) - anthropic.py: call _convert_messages() before streaming to normalize tool_calls/tool_results payload format - anthropic.py: move extended_thinking betas retry to stream() call site rather than the dict assignment - mock.py: async_stream() now tracks calls in self.calls (same metadata as complete()) and derives default chunks from response queue / response_handler / default_response - tests/adapters/test_llm_async.py: add 5 new async_stream() lifecycle tests covering default chunks, preconfigured chunks, call tracking, interrupt, and extended_thinking flag propagation
Follow-up ReviewAll three issues flagged in my previous review are resolved in the latest commit: ✅ Fixed
Remaining: tools type inconsistency (design debt, not a blocker)
One minor observation In Overall: the abstraction is sound, the three critical fixes landed cleanly, and the test migration from patched SDK internals to |
Summary
StreamChunkdataclass tobase.pyas a normalized streaming type shared across all providersasync_stream()andsupports()toLLMProviderABCasync_stream()inAnthropicProvider— moves Anthropic SDK streaming out ofstreaming_chat.py;supports("extended_thinking")returnsTrueasync_stream()inOpenAIProvider— translates OpenAI SSE chunks toStreamChunkasync_stream()toMockProviderwith configurableStreamChunksequences for testingStreamingChatAdapterto acceptLLMProvider; noimport anthropicremainssession_chat_ws.pyto constructAnthropicProviderexplicitly as the composition roottest_e2b_adapter.pyto skipTestE2BAgentAdapterwhen thee2bpackage is not installedCloses #548
Acceptance Criteria
streaming_chat.pyhas noimport anthropicasync_stream()implementation)supports()returnsFalseby default)MockProvider)Test plan
tests/core/test_streaming_chat.py— 20 tests, all pass; updated to useMockProvider.async_stream()instead of patching_async_clienttests/adapters/test_llm.py,test_llm_async.py,test_llm_openai.py— 62 tests, all passruff check .— no errorsSummary by CodeRabbit