From 1eda4ec288a4aea3e3893db8ff5894eee3ef585a Mon Sep 17 00:00:00 2001 From: Nilesh Patil <128893479+nileshpatil6@users.noreply.github.com> Date: Sat, 16 May 2026 20:03:45 +0530 Subject: [PATCH 1/2] fix(bedrock): raise APIStatusError for non-200 SSE error events When Bedrock returns a non-200 status code inside an SSE event (e.g. internalServerException with status 400), the stream decoder was raising a raw ValueError which users could not catch via the standard anthropic.APIError hierarchy. Instead, yield a ServerSentEvent with event="error" containing the Bedrock error body so the existing error-handling path in Stream and AsyncStream calls _make_status_error and raises the appropriate APIStatusError subclass. Fixes #1477 --- src/anthropic/lib/bedrock/_stream_decoder.py | 36 +++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/src/anthropic/lib/bedrock/_stream_decoder.py b/src/anthropic/lib/bedrock/_stream_decoder.py index 02e81a3ca..018bac6b2 100644 --- a/src/anthropic/lib/bedrock/_stream_decoder.py +++ b/src/anthropic/lib/bedrock/_stream_decoder.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json from typing import TYPE_CHECKING, Iterator, AsyncIterator from ..._utils import lru_cache @@ -37,7 +38,7 @@ def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]: for event in event_stream_buffer: message = self._parse_message_from_event(event) if message: - yield ServerSentEvent(data=message, event="completion") + yield message async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[ServerSentEvent]: """Given an async iterator that yields lines, iterate over it & yield every event encountered""" @@ -49,16 +50,41 @@ async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[Ser for event in event_stream_buffer: message = self._parse_message_from_event(event) if message: - yield ServerSentEvent(data=message, event="completion") + yield message - def _parse_message_from_event(self, event: EventStreamMessage) -> str | None: + def _parse_message_from_event(self, event: EventStreamMessage) -> ServerSentEvent | None: response_dict = event.to_response_dict() parsed_response = self.parser.parse(response_dict, get_response_stream_shape()) if response_dict["status_code"] != 200: - raise ValueError(f"Bad response code, expected 200: {response_dict}") + # Bedrock surfaces errors as non-200 frames inside the event stream + # (e.g. internalServerException with status 400 or 500). Raising a + # raw ValueError here means callers cannot catch them via the + # standard anthropic.APIError hierarchy. Instead we emit an SSE + # error event so the existing error-handling path in Stream / + # AsyncStream calls _make_status_error and raises the correct + # APIStatusError subclass. + exception_type = response_dict.get("headers", {}).get(":exception-type", "unknown") + raw_body: bytes | None = response_dict.get("body") + try: + body_str = raw_body.decode() if isinstance(raw_body, (bytes, bytearray)) else (raw_body or "") + body_data = json.loads(body_str) + err_message = body_data.get("message", body_str) + except Exception: + err_message = str(raw_body) + + error_body = json.dumps( + { + "type": "error", + "error": { + "type": exception_type, + "message": err_message, + }, + } + ) + return ServerSentEvent(data=error_body, event="error") chunk = parsed_response.get("chunk") if not chunk: return None - return chunk.get("bytes").decode() # type: ignore[no-any-return] + return ServerSentEvent(data=chunk.get("bytes").decode(), event="completion") # type: ignore[no-any-return] From 002af394521781b430ecdcf3c5fbdea53614f533 Mon Sep 17 00:00:00 2001 From: nileshpatil6 Date: Mon, 18 May 2026 23:25:21 +0530 Subject: [PATCH 2/2] fix(bedrock): map SSE error frames to correct API exception subclasses Bedrock streaming errors arrive as non-200 event frames inside 200 OK HTTP responses. The previous code raised ValueError (untyped) or always fell through to generic APIStatusError because _make_status_error only checked response.status_code (always 200 for Bedrock streams). - embed _bedrock_status in the error SSE body so the real status code propagates through the streaming pipeline - update BaseBedrockClient and BaseMantleClient._make_status_error to prefer _bedrock_status over response.status_code when present - fix bytes fallback: use .decode(errors='replace') instead of str() to avoid b'...' repr noise in error messages - add unit tests covering the non-200 frame path and status mapping --- src/anthropic/lib/bedrock/_client.py | 25 ++-- src/anthropic/lib/bedrock/_mantle.py | 27 +++-- src/anthropic/lib/bedrock/_stream_decoder.py | 3 +- tests/lib/test_bedrock_stream_decoder.py | 117 +++++++++++++++++++ 4 files changed, 152 insertions(+), 20 deletions(-) create mode 100644 tests/lib/test_bedrock_stream_decoder.py diff --git a/src/anthropic/lib/bedrock/_client.py b/src/anthropic/lib/bedrock/_client.py index cda0690df..ded84fea7 100644 --- a/src/anthropic/lib/bedrock/_client.py +++ b/src/anthropic/lib/bedrock/_client.py @@ -99,31 +99,38 @@ def _make_status_error( body: object, response: httpx.Response, ) -> APIStatusError: - if response.status_code == 400: + # Bedrock streaming errors arrive inside 200 OK event frames. + # The decoder embeds the real status code in the body so we map + # to the correct exception subclass here. + status_code = response.status_code + if isinstance(body, dict) and body.get("_bedrock_status") is not None: + status_code = int(body["_bedrock_status"]) + + if status_code == 400: return _exceptions.BadRequestError(err_msg, response=response, body=body) - if response.status_code == 401: + if status_code == 401: return _exceptions.AuthenticationError(err_msg, response=response, body=body) - if response.status_code == 403: + if status_code == 403: return _exceptions.PermissionDeniedError(err_msg, response=response, body=body) - if response.status_code == 404: + if status_code == 404: return _exceptions.NotFoundError(err_msg, response=response, body=body) - if response.status_code == 409: + if status_code == 409: return _exceptions.ConflictError(err_msg, response=response, body=body) - if response.status_code == 422: + if status_code == 422: return _exceptions.UnprocessableEntityError(err_msg, response=response, body=body) - if response.status_code == 429: + if status_code == 429: return _exceptions.RateLimitError(err_msg, response=response, body=body) - if response.status_code == 503: + if status_code == 503: return _exceptions.ServiceUnavailableError(err_msg, response=response, body=body) - if response.status_code >= 500: + if status_code >= 500: return _exceptions.InternalServerError(err_msg, response=response, body=body) return APIStatusError(err_msg, response=response, body=body) diff --git a/src/anthropic/lib/bedrock/_mantle.py b/src/anthropic/lib/bedrock/_mantle.py index 5507462af..c33182e5e 100644 --- a/src/anthropic/lib/bedrock/_mantle.py +++ b/src/anthropic/lib/bedrock/_mantle.py @@ -66,34 +66,41 @@ def _make_status_error( body: object, response: httpx.Response, ) -> APIStatusError: - if response.status_code == 400: + # Bedrock streaming errors arrive inside 200 OK event frames. + # The decoder embeds the real status code in the body so we map + # to the correct exception subclass here. + status_code = response.status_code + if isinstance(body, dict) and body.get("_bedrock_status") is not None: + status_code = int(body["_bedrock_status"]) + + if status_code == 400: return _exceptions.BadRequestError(err_msg, response=response, body=body) - if response.status_code == 401: + if status_code == 401: return _exceptions.AuthenticationError(err_msg, response=response, body=body) - if response.status_code == 403: + if status_code == 403: return _exceptions.PermissionDeniedError(err_msg, response=response, body=body) - if response.status_code == 404: + if status_code == 404: return _exceptions.NotFoundError(err_msg, response=response, body=body) - if response.status_code == 409: + if status_code == 409: return _exceptions.ConflictError(err_msg, response=response, body=body) - if response.status_code == 413: + if status_code == 413: return _exceptions.RequestTooLargeError(err_msg, response=response, body=body) - if response.status_code == 422: + if status_code == 422: return _exceptions.UnprocessableEntityError(err_msg, response=response, body=body) - if response.status_code == 429: + if status_code == 429: return _exceptions.RateLimitError(err_msg, response=response, body=body) - if response.status_code == 529: + if status_code == 529: return _exceptions.OverloadedError(err_msg, response=response, body=body) - if response.status_code >= 500: + if status_code >= 500: return _exceptions.InternalServerError(err_msg, response=response, body=body) return APIStatusError(err_msg, response=response, body=body) diff --git a/src/anthropic/lib/bedrock/_stream_decoder.py b/src/anthropic/lib/bedrock/_stream_decoder.py index 018bac6b2..1d41ecfce 100644 --- a/src/anthropic/lib/bedrock/_stream_decoder.py +++ b/src/anthropic/lib/bedrock/_stream_decoder.py @@ -70,7 +70,7 @@ def _parse_message_from_event(self, event: EventStreamMessage) -> ServerSentEven body_data = json.loads(body_str) err_message = body_data.get("message", body_str) except Exception: - err_message = str(raw_body) + err_message = raw_body.decode(errors="replace") if isinstance(raw_body, bytes) else str(raw_body or "") error_body = json.dumps( { @@ -79,6 +79,7 @@ def _parse_message_from_event(self, event: EventStreamMessage) -> ServerSentEven "type": exception_type, "message": err_message, }, + "_bedrock_status": response_dict["status_code"], } ) return ServerSentEvent(data=error_body, event="error") diff --git a/tests/lib/test_bedrock_stream_decoder.py b/tests/lib/test_bedrock_stream_decoder.py new file mode 100644 index 000000000..c65d5387f --- /dev/null +++ b/tests/lib/test_bedrock_stream_decoder.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import json +from unittest.mock import MagicMock + +import httpx +import pytest + +from anthropic import _exceptions +from anthropic.lib.bedrock._stream_decoder import AWSEventStreamDecoder + + +def _make_decoder() -> AWSEventStreamDecoder: + decoder = AWSEventStreamDecoder() + decoder.parser = MagicMock() + decoder.parser.parse.return_value = {} + return decoder + + +def _make_event(status_code: int, exception_type: str, body: bytes) -> MagicMock: + event = MagicMock() + event.to_response_dict.return_value = { + "status_code": status_code, + "headers": {":exception-type": exception_type}, + "body": body, + } + return event + + +class TestAWSEventStreamDecoderErrors: + def test_non_200_frame_emits_error_sse(self) -> None: + decoder = _make_decoder() + body = json.dumps({"message": "Service unavailable"}).encode() + event = _make_event(529, "overloadedException", body) + + sse = decoder._parse_message_from_event(event) + + assert sse is not None + assert sse.event == "error" + data = json.loads(sse.data) + assert data["_bedrock_status"] == 529 + assert data["error"]["type"] == "overloadedException" + assert data["error"]["message"] == "Service unavailable" + + def test_non_200_frame_500_internal_error(self) -> None: + decoder = _make_decoder() + body = json.dumps({"message": "Internal failure"}).encode() + event = _make_event(500, "internalServerException", body) + + sse = decoder._parse_message_from_event(event) + + assert sse is not None + data = json.loads(sse.data) + assert data["_bedrock_status"] == 500 + assert data["error"]["type"] == "internalServerException" + + def test_non_200_frame_invalid_utf8_body(self) -> None: + decoder = _make_decoder() + body = b"\xff\xfe not valid utf-8 or json" + event = _make_event(500, "internalServerException", body) + + sse = decoder._parse_message_from_event(event) + + assert sse is not None + data = json.loads(sse.data) + assert data["_bedrock_status"] == 500 + assert isinstance(data["error"]["message"], str) + + +class TestBaseBedrockClientMakeStatusError: + def _make_client(self): + from anthropic import AnthropicBedrock + return AnthropicBedrock( + aws_region="us-east-1", + aws_access_key="test-key", + aws_secret_key="test-secret", + ) + + def test_bedrock_status_400_maps_to_bad_request(self) -> None: + client = self._make_client() + response = httpx.Response(200) + body = { + "type": "error", + "error": {"type": "validationException", "message": "invalid input"}, + "_bedrock_status": 400, + } + err = client._make_status_error("invalid input", body=body, response=response) + assert isinstance(err, _exceptions.BadRequestError) + + def test_bedrock_status_429_maps_to_rate_limit(self) -> None: + client = self._make_client() + response = httpx.Response(200) + body = { + "type": "error", + "error": {"type": "throttlingException", "message": "rate limited"}, + "_bedrock_status": 429, + } + err = client._make_status_error("rate limited", body=body, response=response) + assert isinstance(err, _exceptions.RateLimitError) + + def test_bedrock_status_500_maps_to_internal_server_error(self) -> None: + client = self._make_client() + response = httpx.Response(200) + body = { + "type": "error", + "error": {"type": "internalServerException", "message": "internal"}, + "_bedrock_status": 500, + } + err = client._make_status_error("internal", body=body, response=response) + assert isinstance(err, _exceptions.InternalServerError) + + def test_no_bedrock_status_uses_response_status(self) -> None: + client = self._make_client() + response = httpx.Response(403) + body = {"type": "error", "error": {"type": "accessDeniedException"}} + err = client._make_status_error("forbidden", body=body, response=response) + assert isinstance(err, _exceptions.PermissionDeniedError)