Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions src/anthropic/lib/bedrock/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,31 +99,38 @@ def _make_status_error(
body: object,
response: httpx.Response,
) -> APIStatusError:
if response.status_code == 400:
# Bedrock streaming errors arrive inside 200 OK event frames.
# The decoder embeds the real status code in the body so we map
# to the correct exception subclass here.
status_code = response.status_code
if isinstance(body, dict) and body.get("_bedrock_status") is not None:
status_code = int(body["_bedrock_status"])

if status_code == 400:
return _exceptions.BadRequestError(err_msg, response=response, body=body)

if response.status_code == 401:
if status_code == 401:
return _exceptions.AuthenticationError(err_msg, response=response, body=body)

if response.status_code == 403:
if status_code == 403:
return _exceptions.PermissionDeniedError(err_msg, response=response, body=body)

if response.status_code == 404:
if status_code == 404:
return _exceptions.NotFoundError(err_msg, response=response, body=body)

if response.status_code == 409:
if status_code == 409:
return _exceptions.ConflictError(err_msg, response=response, body=body)

if response.status_code == 422:
if status_code == 422:
return _exceptions.UnprocessableEntityError(err_msg, response=response, body=body)

if response.status_code == 429:
if status_code == 429:
return _exceptions.RateLimitError(err_msg, response=response, body=body)

if response.status_code == 503:
if status_code == 503:
return _exceptions.ServiceUnavailableError(err_msg, response=response, body=body)

if response.status_code >= 500:
if status_code >= 500:
return _exceptions.InternalServerError(err_msg, response=response, body=body)
return APIStatusError(err_msg, response=response, body=body)

Expand Down
27 changes: 17 additions & 10 deletions src/anthropic/lib/bedrock/_mantle.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,34 +66,41 @@ def _make_status_error(
body: object,
response: httpx.Response,
) -> APIStatusError:
if response.status_code == 400:
# Bedrock streaming errors arrive inside 200 OK event frames.
# The decoder embeds the real status code in the body so we map
# to the correct exception subclass here.
status_code = response.status_code
if isinstance(body, dict) and body.get("_bedrock_status") is not None:
status_code = int(body["_bedrock_status"])

if status_code == 400:
return _exceptions.BadRequestError(err_msg, response=response, body=body)

if response.status_code == 401:
if status_code == 401:
return _exceptions.AuthenticationError(err_msg, response=response, body=body)

if response.status_code == 403:
if status_code == 403:
return _exceptions.PermissionDeniedError(err_msg, response=response, body=body)

if response.status_code == 404:
if status_code == 404:
return _exceptions.NotFoundError(err_msg, response=response, body=body)

if response.status_code == 409:
if status_code == 409:
return _exceptions.ConflictError(err_msg, response=response, body=body)

if response.status_code == 413:
if status_code == 413:
return _exceptions.RequestTooLargeError(err_msg, response=response, body=body)

if response.status_code == 422:
if status_code == 422:
return _exceptions.UnprocessableEntityError(err_msg, response=response, body=body)

if response.status_code == 429:
if status_code == 429:
return _exceptions.RateLimitError(err_msg, response=response, body=body)

if response.status_code == 529:
if status_code == 529:
return _exceptions.OverloadedError(err_msg, response=response, body=body)

if response.status_code >= 500:
if status_code >= 500:
return _exceptions.InternalServerError(err_msg, response=response, body=body)
return APIStatusError(err_msg, response=response, body=body)

Expand Down
37 changes: 32 additions & 5 deletions src/anthropic/lib/bedrock/_stream_decoder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import json
from typing import TYPE_CHECKING, Iterator, AsyncIterator

from ..._utils import lru_cache
Expand Down Expand Up @@ -37,7 +38,7 @@ def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]:
for event in event_stream_buffer:
message = self._parse_message_from_event(event)
if message:
yield ServerSentEvent(data=message, event="completion")
yield message

async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[ServerSentEvent]:
"""Given an async iterator that yields lines, iterate over it & yield every event encountered"""
Expand All @@ -49,16 +50,42 @@ async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[Ser
for event in event_stream_buffer:
message = self._parse_message_from_event(event)
if message:
yield ServerSentEvent(data=message, event="completion")
yield message

def _parse_message_from_event(self, event: EventStreamMessage) -> str | None:
def _parse_message_from_event(self, event: EventStreamMessage) -> ServerSentEvent | None:
response_dict = event.to_response_dict()
parsed_response = self.parser.parse(response_dict, get_response_stream_shape())
if response_dict["status_code"] != 200:
raise ValueError(f"Bad response code, expected 200: {response_dict}")
# Bedrock surfaces errors as non-200 frames inside the event stream
# (e.g. internalServerException with status 400 or 500). Raising a
# raw ValueError here means callers cannot catch them via the
# standard anthropic.APIError hierarchy. Instead we emit an SSE
# error event so the existing error-handling path in Stream /
# AsyncStream calls _make_status_error and raises the correct
# APIStatusError subclass.
exception_type = response_dict.get("headers", {}).get(":exception-type", "unknown")
raw_body: bytes | None = response_dict.get("body")
try:
body_str = raw_body.decode() if isinstance(raw_body, (bytes, bytearray)) else (raw_body or "")
body_data = json.loads(body_str)
err_message = body_data.get("message", body_str)
except Exception:
err_message = raw_body.decode(errors="replace") if isinstance(raw_body, bytes) else str(raw_body or "")

error_body = json.dumps(
{
"type": "error",
"error": {
"type": exception_type,
"message": err_message,
},
"_bedrock_status": response_dict["status_code"],
}
)
return ServerSentEvent(data=error_body, event="error")

chunk = parsed_response.get("chunk")
if not chunk:
return None

return chunk.get("bytes").decode() # type: ignore[no-any-return]
return ServerSentEvent(data=chunk.get("bytes").decode(), event="completion") # type: ignore[no-any-return]
117 changes: 117 additions & 0 deletions tests/lib/test_bedrock_stream_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from __future__ import annotations

import json
from unittest.mock import MagicMock

import httpx
import pytest

from anthropic import _exceptions
from anthropic.lib.bedrock._stream_decoder import AWSEventStreamDecoder


def _make_decoder() -> AWSEventStreamDecoder:
decoder = AWSEventStreamDecoder()
decoder.parser = MagicMock()
decoder.parser.parse.return_value = {}
return decoder


def _make_event(status_code: int, exception_type: str, body: bytes) -> MagicMock:
event = MagicMock()
event.to_response_dict.return_value = {
"status_code": status_code,
"headers": {":exception-type": exception_type},
"body": body,
}
return event


class TestAWSEventStreamDecoderErrors:
def test_non_200_frame_emits_error_sse(self) -> None:
decoder = _make_decoder()
body = json.dumps({"message": "Service unavailable"}).encode()
event = _make_event(529, "overloadedException", body)

sse = decoder._parse_message_from_event(event)

assert sse is not None
assert sse.event == "error"
data = json.loads(sse.data)
assert data["_bedrock_status"] == 529
assert data["error"]["type"] == "overloadedException"
assert data["error"]["message"] == "Service unavailable"

def test_non_200_frame_500_internal_error(self) -> None:
decoder = _make_decoder()
body = json.dumps({"message": "Internal failure"}).encode()
event = _make_event(500, "internalServerException", body)

sse = decoder._parse_message_from_event(event)

assert sse is not None
data = json.loads(sse.data)
assert data["_bedrock_status"] == 500
assert data["error"]["type"] == "internalServerException"

def test_non_200_frame_invalid_utf8_body(self) -> None:
decoder = _make_decoder()
body = b"\xff\xfe not valid utf-8 or json"
event = _make_event(500, "internalServerException", body)

sse = decoder._parse_message_from_event(event)

assert sse is not None
data = json.loads(sse.data)
assert data["_bedrock_status"] == 500
assert isinstance(data["error"]["message"], str)


class TestBaseBedrockClientMakeStatusError:
def _make_client(self):
from anthropic import AnthropicBedrock
return AnthropicBedrock(
aws_region="us-east-1",
aws_access_key="test-key",
aws_secret_key="test-secret",
)

def test_bedrock_status_400_maps_to_bad_request(self) -> None:
client = self._make_client()
response = httpx.Response(200)
body = {
"type": "error",
"error": {"type": "validationException", "message": "invalid input"},
"_bedrock_status": 400,
}
err = client._make_status_error("invalid input", body=body, response=response)
assert isinstance(err, _exceptions.BadRequestError)

def test_bedrock_status_429_maps_to_rate_limit(self) -> None:
client = self._make_client()
response = httpx.Response(200)
body = {
"type": "error",
"error": {"type": "throttlingException", "message": "rate limited"},
"_bedrock_status": 429,
}
err = client._make_status_error("rate limited", body=body, response=response)
assert isinstance(err, _exceptions.RateLimitError)

def test_bedrock_status_500_maps_to_internal_server_error(self) -> None:
client = self._make_client()
response = httpx.Response(200)
body = {
"type": "error",
"error": {"type": "internalServerException", "message": "internal"},
"_bedrock_status": 500,
}
err = client._make_status_error("internal", body=body, response=response)
assert isinstance(err, _exceptions.InternalServerError)

def test_no_bedrock_status_uses_response_status(self) -> None:
client = self._make_client()
response = httpx.Response(403)
body = {"type": "error", "error": {"type": "accessDeniedException"}}
err = client._make_status_error("forbidden", body=body, response=response)
assert isinstance(err, _exceptions.PermissionDeniedError)