add retry logic to stream() and stream_typed() on initial connection#8
add retry logic to stream() and stream_typed() on initial connection#8ayaan-kapoor wants to merge 2 commits into
Conversation
Greptile SummaryThis PR adds retry logic with exponential back-off and jitter to
Confidence Score: 4/5The implementation is correct and safe to merge; the retry loop, back-off math, and The production code in tests/sdk/test_stream_retry.py — the exhaustion scenario needs a retryable method/status combination (GET + 500 or POST + 429) with enough mocked failures to consume all retries. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[stream / stream_typed called] --> B[attempt = 0]
B --> C[Open HTTP stream connection]
C --> D{status >= 400?}
D -- No --> E[Yield SSE events]
E --> F[return]
D -- Yes --> G[read response body\ncreate exc via to_api_error]
G --> H{attempt < max_retries\nAND _should_retry?}
H -- No --> I[raise exc]
H -- Yes --> J[_retry_delay\nexponential back-off + jitter\nor Retry-After header]
J --> K[time.sleep / asyncio.sleep]
K --> L[attempt += 1]
L --> C
Reviews (2): Last reviewed commit: "fix review: correct POST+500 test, fix c..." | Re-trigger Greptile |
| @respx.mock | ||
| def test_stream_retries_on_500_then_yields_events(monkeypatch: pytest.MonkeyPatch, client: Gumloop) -> None: | ||
| monkeypatch.setattr(time, "sleep", lambda _: None) | ||
|
|
||
| session_id = "sess_abc" | ||
| respx.post(f"{_STREAM_BASE}/sessions/{session_id}/stream").mock( | ||
| side_effect=[ | ||
| httpx.Response(500, json={}), | ||
| httpx.Response(200, content=_sse('{"type": "done"}'), headers={"content-type": "text/event-stream"}), | ||
| ] | ||
| ) | ||
|
|
||
| events = list(client._http.stream("POST", f"sessions/{session_id}/stream")) | ||
| assert len(events) == 1 |
There was a problem hiding this comment.
POST + 500 retry test will fail
_should_retry explicitly returns False when the method is not in _IDEMPOTENT_METHODS and the exception is a ServerError. POST is not in that set, so a 500 from a POST stream is raised immediately on the first attempt — it is never retried. The test issues a POST then expects the second (successful) mock to be consumed, but stream() will raise exc after the first 500 and the list(...) call will propagate the ServerError before events is even assigned.
| # Streaming connections are always GET-like: the caller hasn't mutated state, | ||
| # so it is safe to retry on any transient server error, not just 429. | ||
| _IDEMPOTENT_METHODS = frozenset({"GET", "HEAD", "DELETE", "OPTIONS", "PUT"}) |
There was a problem hiding this comment.
Misleading comment contradicts the retry policy
The comment says "Streaming connections are always GET-like: the caller hasn't mutated state, so it is safe to retry on any transient server error, not just 429." This implies every streaming call will retry on 5xx regardless of method. But _should_retry immediately returns False for POST/PATCH + ServerError, so POST streams are not retried on 500 — only on 429. The comment should either be removed or updated to describe the actual rule ("same as _request(): only idempotent methods retry on 5xx; POST/PATCH only on 429").
| @respx.mock | ||
| def test_async_stream_retries_on_429_then_yields_events() -> None: | ||
| async def run() -> None: | ||
| import asyncio as _asyncio | ||
|
|
||
| slept: list[float] = [] | ||
|
|
||
| async def _noop_sleep(s: float) -> None: | ||
| slept.append(s) | ||
|
|
||
| _asyncio.sleep = _noop_sleep # type: ignore[assignment] | ||
|
|
||
| respx.post(f"{_STREAM_BASE}/sessions/sess_abc/stream").mock( | ||
| side_effect=[ | ||
| httpx.Response(429, headers={"retry-after": "0"}, json={}), | ||
| httpx.Response( | ||
| 200, | ||
| content=_sse('{"type": "message"}'), | ||
| headers={"content-type": "text/event-stream"}, | ||
| ), | ||
| ] | ||
| ) | ||
|
|
||
| async with AsyncGumloop(access_token="token") as client: | ||
| events = [e async for e in client._http.stream("POST", "sessions/sess_abc/stream")] | ||
|
|
||
| assert len(events) == 1 | ||
|
|
||
| asyncio.run(run()) |
There was a problem hiding this comment.
Global
asyncio.sleep patch is not test-isolated
_asyncio.sleep = _noop_sleep replaces the module-level function permanently for the duration of the process. If another test in the same worker imports asyncio after this runs, it will get the patched version. Using monkeypatch.setattr(asyncio, "sleep", _noop_sleep) (as done for the sync time.sleep in the other tests) is the correct approach — it is automatically reverted after the test finishes.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
What
Add retry logic to
HttpClient.stream(),HttpClient.stream_typed(), and their async counterparts for transient failures on the initial connection.Why
A bot review on #7 flagged that
stream()andstream_typed()had no retry logic while_request()andpost_to_stream_host()did — callers using the streaming path would get an immediateRateLimitErrororServerErrorwhere the non-streaming path would transparently retry.Retry only covers the initial connection handshake. Once the server starts sending events we're committed to the response and can't restart without re-delivering events to the caller, so mid-stream errors are left as-is.
Same rules as
_request(): idempotent methods retry on both 429 and 5xx; POST/PATCH only retry on 429.max_retries=0disables it entirely.Tests
7 new tests in
tests/sdk/test_stream_retry.pycovering sync and async paths, 429/500 retry-then-succeed, exhaustion after max retries, no-retry on 4xx, andmax_retries=0.