Skip to content

Commit 542770b

Browse files
authored
feat: [CHA-2958] error hierarchy + wait_for_task (#261)
* feat: standardized error hierarchy + wait_for_task (CHA-2958) Adds StreamException / StreamApiException / StreamRateLimitException / StreamTransportException / StreamTaskException per the Server-Side SDK Error Handling Spec §9.2. APIError envelope parsing now surfaces unrecoverable, exception_fields, more_info, details, and the raw body on the new StreamApiException class. 429s build StreamRateLimitException with retry_after parsed from Retry-After (RFC 7231 §7.1.3 — integer seconds or HTTP-date, past dates clamp to zero, missing/garbage → None). httpx.RequestError raised by the sync and async transport paths is wrapped into StreamTransportException with an error_type enum (connection_reset / timeout / dns_failure / tls_handshake_failed / unknown) and the original exception preserved via __cause__. The classifier walks __cause__/__context__ to detect ssl.SSLError and socket.gaierror that httpx hides one level down. Adds Stream.wait_for_task and AsyncStream.wait_for_task that poll get_task until terminal state. Failed tasks raise StreamTaskException populated from ErrorResult; timeouts raise StreamTransportException with error_type='timeout'. getstream.base.StreamAPIException (capital API) is now an alias for the new StreamApiException via a module-level __getattr__ that emits DeprecationWarning. The alias points at the same class object, so isinstance / except / pytest.raises keep working without changes; it will be removed one minor cycle after this release. * test: replace mocks in test_exceptions.py with real HTTP server + sockets Honors AGENTS.md "do not use mocks or mock things in general unless you are asked to do that directly" (also flagged by CodeRabbit on PR #261). Two transport-wrapping tests previously used httpx.MockTransport. They now drive real httpx errors: - Connection-refused goes via a freshly-bound-and-closed loopback port. - Read-timeout goes via a real pytest-httpserver instance that sleeps past the client's request_timeout. Six wait_for_task tests previously used _Fake* stand-in client + response classes. They now point a real Stream/AsyncStream client at a pytest- httpserver that returns get-task JSON in the real wire format (nanosecond-epoch created_at/updated_at, ErrorResult payload). Adds pytest-httpserver as a dev dependency. * docs(py): drop spec § refs and mechanism prose from error-handling docstrings
1 parent 35e84b0 commit 542770b

9 files changed

Lines changed: 1211 additions & 63 deletions

File tree

CHANGELOG.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,34 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- Standardized error hierarchy ([CHA-2958](https://linear.app/stream/issue/CHA-2958)).
13+
New exception classes importable from `getstream` (also re-exported from
14+
`getstream.exceptions`):
15+
- `StreamException`: abstract base for every SDK error.
16+
- `StreamApiException`: any HTTP 4xx/5xx response. Carries `status_code`,
17+
`code`, `message`, `exception_fields`, `unrecoverable`,
18+
`raw_response_body`, `more_info`, `details`. The `unrecoverable` flag
19+
from `APIError` is now surfaced (was previously dropped on most paths).
20+
- `StreamRateLimitException`: subclass of `StreamApiException` raised on
21+
HTTP 429. Adds `retry_after: datetime.timedelta | None`, parsed from
22+
`Retry-After` per RFC 7231 (integer seconds or HTTP-date). Missing or
23+
unparseable headers map to `None`; past HTTP-dates clamp to `0`.
24+
- `StreamTransportException`: raised when a network-layer failure (no
25+
HTTP response received) propagates out of `httpx` — connection reset,
26+
timeout, TLS handshake failure, DNS failure. Carries `error_type`
27+
enum (`connection_reset` / `timeout` / `dns_failure` /
28+
`tls_handshake_failed` / `unknown`). The original `httpx` exception
29+
is preserved as `__cause__`.
30+
- `StreamTaskException`: raised by `wait_for_task` when the polled task
31+
ends in `status='failed'`. Carries `task_id`, `error_type`,
32+
`description`, `stack_trace`, `version`.
33+
- `Stream.wait_for_task(task_id, *, poll_interval=1.0, timeout=60.0)` and
34+
the matching async coroutine on `AsyncStream`. Polls `get_task` until the
35+
task reaches a terminal state. On `completed` returns the
36+
`StreamResponse[GetTaskResponse]`; on `failed` raises
37+
`StreamTaskException` populated from `ErrorResult`; on timeout raises
38+
`StreamTransportException(error_type='timeout')`.
39+
1240
- Explicit HTTP connection pool configuration ([CHA-2956](https://linear.app/stream/issue/CHA-2956/connection-pooling)).
1341
Four new kwargs on `Stream(...)` and `AsyncStream(...)`:
1442
- `max_conns_per_host: int`: default `5`
@@ -45,11 +73,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4573

4674
### Changed
4775

76+
- HTTP request errors raised by `httpx` (the `httpx.RequestError` family —
77+
`ConnectError`, `ReadTimeout`, etc.) are now wrapped at the SDK boundary
78+
in `StreamTransportException` so callers handle one Stream error category
79+
instead of catching `httpx.RequestError` separately. The original `httpx`
80+
exception is preserved via `__cause__` (CHA-2958).
4881
- **Default `request_timeout` is now `30.0` seconds (was `6.0`).** Aligns stream-py with the cross-SDK contract in CHA-2956. Existing callers using `timeout=` are unaffected; `timeout` is kept as an alias for `request_timeout`. Callers relying on the 6s ceiling for fail-fast behavior should pass `request_timeout=6.0` (or `timeout=6.0`) explicitly.
4982
- Default HTTP transport now caps connections per host at `5` and closes idle sockets after `55.0s`. Previous default was httpx's `100` max-connections with `5.0s` keep-alive expiry.
5083
- No breaking changes. All existing webhook helpers (`verify_webhook_signature`,
5184
`parse_webhook_event`, `get_event_type`, event type constants) are preserved.
5285

86+
### Deprecated
87+
88+
- `getstream.base.StreamAPIException` (capital `API`) is now an alias for
89+
`getstream.exceptions.StreamApiException` (lowercase `Api`). Importing the
90+
old name emits `DeprecationWarning`; existing `isinstance` / `except` /
91+
`pytest.raises` checks continue to work because the alias resolves to the
92+
same class. The legacy spelling will be removed one minor cycle after this
93+
release (CHA-2958 §10).
94+
5395
### Notes
5496

5597
- Per-call `timeout=httpx.Timeout(...)` continues to work through `.get(...)`, `.post(...)`, etc., and pre-empts the client-level `request_timeout`.

getstream/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,9 @@
1+
from getstream.exceptions import ( # noqa: F401
2+
StreamApiException,
3+
StreamException,
4+
StreamRateLimitException,
5+
StreamTaskException,
6+
StreamTransportException,
7+
)
18
from getstream.stream import Stream # noqa: F401
29
from getstream.stream import AsyncStream # noqa: F401

getstream/base.py

Lines changed: 34 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,15 @@
44
import os
55
import time
66
import uuid
7+
import warnings
78
import asyncio
89
from typing import Any, Dict, List, Optional, Tuple, Type, cast, get_origin
910

10-
from getstream.models import APIError
11-
from getstream.rate_limit import extract_rate_limit
11+
from getstream.exceptions import (
12+
StreamApiException,
13+
build_api_exception,
14+
wrap_transport_error,
15+
)
1216
from getstream.stream_response import StreamResponse
1317
from getstream.generic import T
1418
import httpx
@@ -102,9 +106,7 @@ def _parse_response(
102106
self, response: httpx.Response, data_type: Type[T]
103107
) -> StreamResponse[T]:
104108
if response.status_code >= 399:
105-
raise StreamAPIException(
106-
response=response,
107-
)
109+
raise build_api_exception(response)
108110

109111
try:
110112
parsed_result = json.loads(response.text) if response.text else {}
@@ -118,10 +120,8 @@ def _parse_response(
118120
else:
119121
data = cast(T, parsed_result)
120122

121-
except (ValueError, AttributeError):
122-
raise StreamAPIException(
123-
response=response,
124-
)
123+
except (ValueError, AttributeError) as err:
124+
raise StreamApiException(response=response) from err
125125

126126
return StreamResponse(response, data)
127127

@@ -291,9 +291,12 @@ def _request_sync(
291291
) as span:
292292
call_kwargs = dict(kwargs)
293293
call_kwargs.pop("path_params", None)
294-
response = getattr(self.client, method.lower())(
295-
url_path, params=query_params, *args, **call_kwargs
296-
)
294+
try:
295+
response = getattr(self.client, method.lower())(
296+
url_path, params=query_params, *args, **call_kwargs
297+
)
298+
except httpx.RequestError as err:
299+
raise wrap_transport_error(err) from err
297300
duration = parse_duration_from_body(response.content)
298301
if duration:
299302
span.set_attribute("http.server.duration", duration)
@@ -604,9 +607,12 @@ async def _request_async(
604607
call_kwargs["headers"] = call_kwargs.get("headers", {})
605608
call_kwargs["headers"]["Content-Type"] = "application/json"
606609

607-
response = await getattr(self.client, method.lower())(
608-
url_path, params=query_params, *args, **call_kwargs
609-
)
610+
try:
611+
response = await getattr(self.client, method.lower())(
612+
url_path, params=query_params, *args, **call_kwargs
613+
)
614+
except httpx.RequestError as err:
615+
raise wrap_transport_error(err) from err
610616
duration = parse_duration_from_body(response.content)
611617
if duration:
612618
span.set_attribute("http.server.duration", duration)
@@ -721,54 +727,19 @@ async def delete(
721727
)
722728

723729

724-
class StreamAPIException(Exception):
725-
"""
726-
A custom exception for handling errors from a Stream API response.
727-
728-
This exception is raised when an API call encounters an issue, providing
729-
detailed information from the HTTP response. It attempts to parse the response
730-
content into a structured API error. If the response content is not JSON or
731-
lacks the expected structure, it will simply report the HTTP status code.
732-
733-
Attributes:
734-
api_error (Optional[APIError]): An optional APIError object that is
735-
populated if the response content contains structured error information.
736-
rate_limit_info (RateLimitInfo): Information about the API's rate limiting
737-
controls extracted from the response headers.
738-
http_response (httpx.Response): The full HTTP response object from httpx.
739-
status_code (int): The HTTP status code from the response.
740-
741-
Args:
742-
response (httpx.Response): The HTTP response received from the Stream API.
743-
744-
Raises:
745-
ValueError: If the response content cannot be parsed into JSON, indicating
746-
that the server's response was not in the expected format.
747-
"""
748-
749-
def __init__(self, response: httpx.Response) -> None:
750-
self.api_error: Optional[APIError] = None
751-
self.rate_limit_info = extract_rate_limit(response)
752-
self.http_response = response
753-
self.status_code = response.status_code
754-
755-
try:
756-
parsed_response: Dict = json.loads(response.content)
757-
self.api_error = APIError.from_dict(parsed_response)
758-
except ValueError:
759-
pass
760-
761-
def __str__(self) -> str:
762-
if self.api_error:
763-
return f'Stream error code {self.api_error.code}: {self.api_error.message}"'
764-
body_preview = ""
765-
try:
766-
text = self.http_response.text[:200] if self.http_response.text else ""
767-
if text:
768-
body_preview = f" body: {text}"
769-
except Exception:
770-
pass
771-
return f"Stream error HTTP code: {self.status_code}{body_preview}"
730+
def __getattr__(name: str):
731+
"""StreamApiException is exported under its new name; resolve here lazily and warn once."""
732+
if name == "StreamAPIException":
733+
warnings.warn(
734+
"getstream.base.StreamAPIException is deprecated; import "
735+
"StreamApiException from getstream (or getstream.exceptions) "
736+
"instead. The legacy alias will be removed one minor cycle after "
737+
"this release.",
738+
DeprecationWarning,
739+
stacklevel=2,
740+
)
741+
return StreamApiException
742+
raise AttributeError(f"module 'getstream.base' has no attribute {name!r}")
772743

773744

774745
def parse_duration_from_body(body: bytes) -> Optional[str]:

0 commit comments

Comments
 (0)