Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions src/anthropic/lib/bedrock/_stream_decoder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import json
from typing import TYPE_CHECKING, Iterator, AsyncIterator

from ..._utils import lru_cache
Expand Down Expand Up @@ -37,7 +38,7 @@ def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]:
for event in event_stream_buffer:
message = self._parse_message_from_event(event)
if message:
yield ServerSentEvent(data=message, event="completion")
yield message

async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[ServerSentEvent]:
"""Given an async iterator that yields lines, iterate over it & yield every event encountered"""
Expand All @@ -49,16 +50,41 @@ async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[Ser
for event in event_stream_buffer:
message = self._parse_message_from_event(event)
if message:
yield ServerSentEvent(data=message, event="completion")
yield message

def _parse_message_from_event(self, event: EventStreamMessage) -> str | None:
def _parse_message_from_event(self, event: EventStreamMessage) -> ServerSentEvent | None:
response_dict = event.to_response_dict()
parsed_response = self.parser.parse(response_dict, get_response_stream_shape())
if response_dict["status_code"] != 200:
raise ValueError(f"Bad response code, expected 200: {response_dict}")
# Bedrock surfaces errors as non-200 frames inside the event stream
# (e.g. internalServerException with status 400 or 500). Raising a
# raw ValueError here means callers cannot catch them via the
# standard anthropic.APIError hierarchy. Instead we emit an SSE
# error event so the existing error-handling path in Stream /
# AsyncStream calls _make_status_error and raises the correct
# APIStatusError subclass.
exception_type = response_dict.get("headers", {}).get(":exception-type", "unknown")
raw_body: bytes | None = response_dict.get("body")
try:
body_str = raw_body.decode() if isinstance(raw_body, (bytes, bytearray)) else (raw_body or "")
body_data = json.loads(body_str)
err_message = body_data.get("message", body_str)
except Exception:
err_message = str(raw_body)

error_body = json.dumps(
{
"type": "error",
"error": {
"type": exception_type,
"message": err_message,
},
}
)
return ServerSentEvent(data=error_body, event="error")

chunk = parsed_response.get("chunk")
if not chunk:
return None

return chunk.get("bytes").decode() # type: ignore[no-any-return]
return ServerSentEvent(data=chunk.get("bytes").decode(), event="completion") # type: ignore[no-any-return]