Skip to content

Commit b411093

Browse files
Wrap mid-stream httpx.TransportError as APIConnectionError
Mid-stream transport drops during SSE iteration (RemoteProtocolError, ReadError, ConnectError, …) leak through as bare httpx exceptions because the SDK's wrapping in _base_client._request only covers the pre-body request — once the SSE 200 is sent and body iteration starts, _iter_events has no try/except. Customers' standard `except anthropic.APIConnectionError:` retry ladders therefore miss mid-stream drops and have to know to also catch `httpx.TransportError`. This wraps mid-stream TransportError (in both Stream and AsyncStream) as APIConnectionError with the original preserved as __cause__, matching the pattern at _base_client.py:1104. TimeoutException (a TransportError subclass) passes through unchanged so it doesn't get double-wrapped — APITimeoutError is already an APIConnectionError subclass. Found while reproducing DeepSearchQA via the public API: 35/45 terminal question failures were RemoteProtocolError that the standard retry ladder missed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 9aa85c8 commit b411093

2 files changed

Lines changed: 92 additions & 4 deletions

File tree

src/anthropic/_streaming.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import httpx
1313

14+
from ._exceptions import APIConnectionError
1415
from ._utils import is_dict, extract_type_var_from_base
1516

1617
if TYPE_CHECKING:
@@ -72,7 +73,23 @@ def __iter__(self) -> Iterator[_T]:
7273
yield item
7374

7475
def _iter_events(self) -> Iterator[ServerSentEvent]:
75-
yield from self._decoder.iter_bytes(self.response.iter_bytes())
76+
try:
77+
yield from self._decoder.iter_bytes(self.response.iter_bytes())
78+
except httpx.TimeoutException:
79+
# Mid-stream timeouts are already handled by `_base_client._request` for the
80+
# initial request, but the SSE body iteration doesn't go through that path —
81+
# re-raise as-is so callers can distinguish a hung stream from a dropped one.
82+
# APITimeoutError is an APIConnectionError subclass, so customers catching
83+
# the latter will still see it; this clause only exists so the next clause
84+
# doesn't double-wrap it (TimeoutException is also a TransportError).
85+
raise
86+
except httpx.TransportError as exc:
87+
# Mid-stream transport drops (RemoteProtocolError, ReadError, ConnectError, …)
88+
# leak through as bare httpx exceptions because the SDK's wrapping in
89+
# `_base_client._request` only covers the pre-body request. Re-wrap them so
90+
# `except anthropic.APIConnectionError:` catches mid-stream drops the same way
91+
# it catches connection failures, and the original is preserved as `__cause__`.
92+
raise APIConnectionError(message=f"Stream interrupted: {exc}", request=self.response.request) from exc
7693

7794
def __stream__(self) -> Iterator[_T]:
7895
cast_to = cast(Any, self._cast_to)
@@ -226,8 +243,17 @@ async def __aiter__(self) -> AsyncIterator[_T]:
226243
yield item
227244

228245
async def _iter_events(self) -> AsyncIterator[ServerSentEvent]:
229-
async for sse in self._decoder.aiter_bytes(self.response.aiter_bytes()):
230-
yield sse
246+
try:
247+
async for sse in self._decoder.aiter_bytes(self.response.aiter_bytes()):
248+
yield sse
249+
except httpx.TimeoutException:
250+
# See sync `_iter_events` — let timeouts pass through so the next clause
251+
# doesn't double-wrap them (TimeoutException is also a TransportError).
252+
raise
253+
except httpx.TransportError as exc:
254+
# See sync `_iter_events` — wrap mid-stream transport drops so
255+
# `except anthropic.APIConnectionError:` catches them.
256+
raise APIConnectionError(message=f"Stream interrupted: {exc}", request=self.response.request) from exc
231257

232258
async def __stream__(self) -> AsyncIterator[_T]:
233259
cast_to = cast(Any, self._cast_to)

tests/test_streaming.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from anthropic import Anthropic, AsyncAnthropic
99
from anthropic._streaming import Stream, AsyncStream, ServerSentEvent
10-
from anthropic._exceptions import APIStatusError
10+
from anthropic._exceptions import APIConnectionError, APIStatusError
1111

1212
_T = TypeVar("_T")
1313

@@ -219,6 +219,68 @@ def body() -> Iterator[bytes]:
219219
assert sse.json() == {"content": "известни"}
220220

221221

222+
@pytest.mark.asyncio
223+
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
224+
async def test_mid_stream_transport_error_is_wrapped(
225+
sync: bool,
226+
client: Anthropic,
227+
async_client: AsyncAnthropic,
228+
) -> None:
229+
"""A transport drop mid-SSE-stream (RemoteProtocolError, ReadError, …) raises
230+
APIConnectionError with the original httpx exception as __cause__, so that
231+
`except anthropic.APIConnectionError:` catches mid-stream drops the same way
232+
it catches initial-connection failures.
233+
"""
234+
235+
def body() -> Iterator[bytes]:
236+
yield b"event: completion\n"
237+
yield b'data: {"foo":1}\n'
238+
yield b"\n"
239+
raise httpx.RemoteProtocolError("peer closed connection without sending complete message body")
240+
241+
request = httpx.Request("POST", "http://test")
242+
if sync:
243+
iterator: Iterator[ServerSentEvent] | AsyncIterator[ServerSentEvent] = Stream(
244+
cast_to=object, client=client, response=httpx.Response(200, content=body(), request=request)
245+
)._iter_events()
246+
else:
247+
iterator = AsyncStream(
248+
cast_to=object, client=async_client, response=httpx.Response(200, content=to_aiter(body()), request=request)
249+
)._iter_events()
250+
251+
# First event arrives normally — the drop is mid-stream, not at connect.
252+
sse = await iter_next(iterator)
253+
assert sse.event == "completion"
254+
255+
with pytest.raises(APIConnectionError) as exc_info:
256+
await iter_next(iterator)
257+
assert isinstance(exc_info.value.__cause__, httpx.RemoteProtocolError)
258+
assert "Stream interrupted" in str(exc_info.value)
259+
260+
261+
@pytest.mark.asyncio
262+
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
263+
async def test_mid_stream_timeout_is_not_wrapped(
264+
sync: bool,
265+
client: Anthropic,
266+
async_client: AsyncAnthropic,
267+
) -> None:
268+
"""TimeoutException is a TransportError subclass, but the wrapping clause must
269+
NOT double-wrap it — APITimeoutError already exists for timeouts and is itself
270+
an APIConnectionError subclass. The bare httpx.TimeoutException should pass
271+
through so callers can map it to APITimeoutError if they want."""
272+
273+
def body() -> Iterator[bytes]:
274+
yield b"event: completion\n"
275+
raise httpx.ReadTimeout("read timeout")
276+
277+
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
278+
279+
with pytest.raises(httpx.ReadTimeout):
280+
await iter_next(iterator)
281+
await iter_next(iterator)
282+
283+
222284
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
223285
async def test_error_type(
224286
sync: bool,

0 commit comments

Comments
 (0)