Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/gumloop/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,28 @@
from gumloop._client import GumloopClient
from gumloop.errors import APIStatusError
from gumloop.errors import AuthenticationError
from gumloop.errors import BadRequestError
from gumloop.errors import GumloopError
from gumloop.errors import NotFoundError
from gumloop.errors import PermissionDeniedError
from gumloop.errors import RateLimitError
from gumloop.errors import ServerError
from gumloop.errors import UnprocessableEntityError
from gumloop.oauth import OAuth

__version__ = "0.3.1"
__all__ = [
"APIStatusError",
"AsyncGumloop",
"AuthenticationError",
"BadRequestError",
"Gumloop",
"GumloopClient",
"GumloopError",
"NotFoundError",
"OAuth",
"PermissionDeniedError",
"RateLimitError",
"ServerError",
"UnprocessableEntityError",
]
7 changes: 7 additions & 0 deletions src/gumloop/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import httpx

from gumloop._http import DEFAULT_MAX_RETRIES
from gumloop._http import AsyncHttpClient
from gumloop._http import HttpClient
from gumloop.oauth import OAuth
Expand Down Expand Up @@ -55,6 +56,7 @@ def __init__(
stream_base_url: str | None = None,
timeout: float = DEFAULT_TIMEOUT,
stream_timeout: float | None = DEFAULT_STREAM_TIMEOUT,
max_retries: int = DEFAULT_MAX_RETRIES,
) -> None:
self.api_key = api_key
self.access_token = access_token or api_key or os.environ.get("GUMLOOP_ACCESS_TOKEN")
Expand All @@ -67,6 +69,7 @@ def __init__(
self.stream_base_url = (stream_base_url or _derive_stream_base_url(self.base_url)).rstrip("/")
self.timeout = timeout
self.stream_timeout = stream_timeout
self.max_retries = max_retries

self._http = HttpClient(
base_url=self.base_url,
Expand All @@ -75,6 +78,7 @@ def __init__(
user_id=self.user_id,
timeout=self.timeout,
stream_timeout=self.stream_timeout,
max_retries=self.max_retries,
)

self.agents = Agents(self._http)
Expand Down Expand Up @@ -110,6 +114,7 @@ def __init__(
stream_base_url: str | None = None,
timeout: float = DEFAULT_TIMEOUT,
stream_timeout: float | None = DEFAULT_STREAM_TIMEOUT,
max_retries: int = DEFAULT_MAX_RETRIES,
) -> None:
self.api_key = api_key
self.access_token = access_token or api_key or os.environ.get("GUMLOOP_ACCESS_TOKEN")
Expand All @@ -119,6 +124,7 @@ def __init__(
self.stream_base_url = (stream_base_url or _derive_stream_base_url(self.base_url)).rstrip("/")
self.timeout = timeout
self.stream_timeout = stream_timeout
self.max_retries = max_retries

self._http = AsyncHttpClient(
base_url=self.base_url,
Expand All @@ -127,6 +133,7 @@ def __init__(
user_id=self.user_id,
timeout=self.timeout,
stream_timeout=self.stream_timeout,
max_retries=self.max_retries,
)

self.agents = AsyncAgents(self._http)
Expand Down
140 changes: 112 additions & 28 deletions src/gumloop/_http.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

import asyncio
import logging
import random
import time
from collections.abc import AsyncIterator
from collections.abc import Iterator
from collections.abc import Mapping
Expand All @@ -13,7 +16,10 @@
from pydantic import BaseModel
from pydantic import ValidationError

from gumloop.errors import APIStatusError
from gumloop.errors import AuthenticationError
from gumloop.errors import RateLimitError
from gumloop.errors import ServerError
from gumloop.errors import to_api_error
from gumloop.types import StreamEvent

Expand All @@ -22,6 +28,11 @@
_DONE_SENTINEL = "[DONE]"
_T = TypeVar("_T", bound=BaseModel)

DEFAULT_MAX_RETRIES = 2
# Base delay in seconds for exponential backoff; actual delay is base * 2^attempt + jitter.
_RETRY_BASE_DELAY = 0.5
_RETRY_MAX_DELAY = 60.0


def _auth_headers(access_token: str | None, user_id: str | None) -> dict[str, str]:
if not access_token:
Expand All @@ -41,6 +52,55 @@ def _omit_none_params(params: Mapping[str, Any] | None) -> dict[str, Any] | None
return {k: v for k, v in params.items() if v is not None}


_IDEMPOTENT_METHODS = frozenset({"GET", "HEAD", "DELETE", "OPTIONS", "PUT"})


def _should_retry(exc: APIStatusError, method: str) -> bool:
# Never retry client errors.
if not isinstance(exc, (RateLimitError, ServerError)):
return False
# POST/PATCH are non-idempotent: a 5xx may arrive after the server already
# committed the write, so retrying would duplicate the mutation. Only retry
# them on 429 (rate-limit), where the server explicitly guarantees the
# request was not processed.
if method.upper() not in _IDEMPOTENT_METHODS and isinstance(exc, ServerError):
return False
return True


def _retry_delay(attempt: int, retry_after: float | None) -> float:
"""Return how many seconds to sleep before the next attempt.

Honours a ``Retry-After`` header when present; otherwise uses exponential
backoff with full jitter so concurrent clients don't thunderherd.
"""
if retry_after is not None:
return retry_after
cap = min(_RETRY_BASE_DELAY * (2**attempt), _RETRY_MAX_DELAY)
return random.uniform(0, cap)


def _parse_retry_after(response: httpx.Response) -> float | None:
import email.utils

raw = response.headers.get("retry-after")
if raw is None:
return None
try:
return float(raw)
except ValueError:
pass
# RFC 7231 also allows an HTTP-date: "Retry-After: Wed, 21 Oct 2015 07:28:00 GMT"
try:
dt = email.utils.parsedate_to_datetime(raw)
import datetime

delta = (dt - datetime.datetime.now(tz=datetime.timezone.utc)).total_seconds()
return max(delta, 0.0)
except Exception:
return None
Comment on lines +83 to +101

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Retry-After HTTP-date format is silently dropped

RFC 7231 allows Retry-After to be either a delay-in-seconds (Retry-After: 30) or an HTTP-date (Retry-After: Wed, 21 Oct 2015 07:28:00 GMT). When the header contains a date string, float(raw) raises ValueError, the function returns None, and the code falls back to exponential backoff — which for attempt=0 is at most 0.5 s. If the server sent a date meaning "wait 30 s", the client will retry far too soon, burn through all retries, and still surface a RateLimitError to the caller while likely worsening the rate-limit situation on the server side.



def _decode_sse(event: ServerSentEvent) -> StreamEvent:
try:
decoded: Any = event.json() if event.data else {}
Expand Down Expand Up @@ -71,11 +131,13 @@ def __init__(
user_id: str | None,
timeout: float,
stream_timeout: float | None,
max_retries: int = DEFAULT_MAX_RETRIES,
) -> None:
self.access_token = access_token
self.user_id = user_id
self._stream_base_url = stream_base_url.rstrip("/")
self._stream_timeout = stream_timeout
self._max_retries = max_retries
self._client = httpx.Client(base_url=base_url.rstrip("/"), timeout=timeout)

def close(self) -> None:
Expand Down Expand Up @@ -119,15 +181,18 @@ def post_to_stream_host(self, path: str, *, json: Any = None) -> Any:
# the api host has no handler for them.
headers = _auth_headers(self.access_token, self.user_id)
headers["Content-Type"] = "application/json"
response = self._client.post(
f"{self._stream_base_url}/{path.lstrip('/')}",
headers=headers,
timeout=self._stream_timeout,
json=json,
)
if response.status_code >= 400:
raise to_api_error(response)
return response.json() if response.content else None
url = f"{self._stream_base_url}/{path.lstrip('/')}"
for attempt in range(self._max_retries + 1):
response = self._client.post(url, headers=headers, timeout=self._stream_timeout, json=json)
if response.status_code < 400:
return response.json() if response.content else None
exc = to_api_error(response)
if attempt < self._max_retries and _should_retry(exc, "POST"):
delay = _retry_delay(attempt, _parse_retry_after(response))
logger.debug("retrying stream-host request (attempt %d, delay %.2fs)", attempt + 1, delay)
time.sleep(delay)
continue
raise exc

def stream(
self,
Expand Down Expand Up @@ -184,7 +249,7 @@ def stream_typed(
yield response_model.model_validate_json(event.data)
except ValidationError:
# Server-side mid-stream error frames or schema-drift events
# land here.
# land here.
logger.debug("dropped non-%s SSE: %s", response_model.__name__, event.data)
continue

Comment on lines 197 to 255

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 stream and stream_typed have no retry logic

Both HttpClient.stream / HttpClient.stream_typed (and their async counterparts) still use the original single-attempt pattern. A 429 or 5xx on a streaming request will raise immediately, while the same status on a non-streaming request through _request or post_to_stream_host will be retried up to max_retries times. Callers who expect consistent retry behaviour regardless of whether they use the streaming or non-streaming variant will be surprised by this inconsistency.

Expand All @@ -194,10 +259,17 @@ def _request(self, method: str, path: str, **kwargs: Any) -> Any:
headers = _auth_headers(self.access_token, self.user_id)
if not kwargs.get("files"):
headers["Content-Type"] = "application/json"
response = self._client.request(method, path, headers=headers, **kwargs)
if response.status_code >= 400:
raise to_api_error(response)
return response.json() if response.content else None
for attempt in range(self._max_retries + 1):
response = self._client.request(method, path, headers=headers, **kwargs)
if response.status_code < 400:
return response.json() if response.content else None
exc = to_api_error(response)
if attempt < self._max_retries and _should_retry(exc, method):
delay = _retry_delay(attempt, _parse_retry_after(response))
logger.debug("retrying %s %s (attempt %d, delay %.2fs)", method, path, attempt + 1, delay)
time.sleep(delay)
continue
raise exc
Comment on lines +262 to +272

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Non-idempotent POST/PATCH retried on 5xx

_request is used by post() and patch() as well as get() and delete(), so the retry loop applies to all of them equally. A 5xx can be returned by a proxy or load balancer after the upstream already committed the write — retrying will then execute the mutation a second time. For example, client.agents.create(...) hitting a 502 from a gateway would create the agent twice before the caller sees any error. The retry should be limited to idempotent methods (GET, DELETE, HEAD) or guarded by a method check.



class AsyncHttpClient:
Expand All @@ -212,11 +284,13 @@ def __init__(
user_id: str | None,
timeout: float,
stream_timeout: float | None,
max_retries: int = DEFAULT_MAX_RETRIES,
) -> None:
self.access_token = access_token
self.user_id = user_id
self._stream_base_url = stream_base_url.rstrip("/")
self._stream_timeout = stream_timeout
self._max_retries = max_retries
self._client = httpx.AsyncClient(base_url=base_url.rstrip("/"), timeout=timeout)

async def aclose(self) -> None:
Expand Down Expand Up @@ -257,15 +331,18 @@ async def delete(self, path: str) -> Any:
async def post_to_stream_host(self, path: str, *, json: Any = None) -> Any:
headers = _auth_headers(self.access_token, self.user_id)
headers["Content-Type"] = "application/json"
response = await self._client.post(
f"{self._stream_base_url}/{path.lstrip('/')}",
headers=headers,
timeout=self._stream_timeout,
json=json,
)
if response.status_code >= 400:
raise to_api_error(response)
return response.json() if response.content else None
url = f"{self._stream_base_url}/{path.lstrip('/')}"
for attempt in range(self._max_retries + 1):
response = await self._client.post(url, headers=headers, timeout=self._stream_timeout, json=json)
if response.status_code < 400:
return response.json() if response.content else None
exc = to_api_error(response)
if attempt < self._max_retries and _should_retry(exc, "POST"):
delay = _retry_delay(attempt, _parse_retry_after(response))
logger.debug("retrying stream-host request (attempt %d, delay %.2fs)", attempt + 1, delay)
await asyncio.sleep(delay)
continue
raise exc

async def stream(
self,
Expand Down Expand Up @@ -320,15 +397,22 @@ async def stream_typed(
yield response_model.model_validate_json(event.data)
except ValidationError:
# Server-side mid-stream error frames or schema-drift events
# land here.
# land here.
logger.debug("dropped non-%s SSE: %s", response_model.__name__, event.data)
continue

async def _request(self, method: str, path: str, **kwargs: Any) -> Any:
headers = _auth_headers(self.access_token, self.user_id)
if not kwargs.get("files"):
headers["Content-Type"] = "application/json"
response = await self._client.request(method, path, headers=headers, **kwargs)
if response.status_code >= 400:
raise to_api_error(response)
return response.json() if response.content else None
for attempt in range(self._max_retries + 1):
response = await self._client.request(method, path, headers=headers, **kwargs)
if response.status_code < 400:
return response.json() if response.content else None
exc = to_api_error(response)
if attempt < self._max_retries and _should_retry(exc, method):
delay = _retry_delay(attempt, _parse_retry_after(response))
logger.debug("retrying %s %s (attempt %d, delay %.2fs)", method, path, attempt + 1, delay)
await asyncio.sleep(delay)
continue
raise exc
47 changes: 44 additions & 3 deletions src/gumloop/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,43 @@ def __init__(self, message: str, *, status_code: int, body: Any = None) -> None:
self.details = self.error.get("details", {}) if isinstance(self.error, dict) else {}


class BadRequestError(APIStatusError):
"""HTTP 400 — the request was malformed or contained invalid parameters."""


class PermissionDeniedError(APIStatusError):
"""HTTP 403 — the caller does not have permission to perform this action."""


class NotFoundError(APIStatusError):
"""HTTP 404 — the requested resource does not exist."""


class UnprocessableEntityError(APIStatusError):
"""HTTP 422 — the request was well-formed but semantically invalid."""


class RateLimitError(APIStatusError):
"""HTTP 429 — too many requests; back off and retry."""


class ServerError(APIStatusError):
"""HTTP 5xx — an unexpected error occurred on the Gumloop server."""


_STATUS_MAP: dict[int, type[APIStatusError]] = {
400: BadRequestError,
403: PermissionDeniedError,
404: NotFoundError,
422: UnprocessableEntityError,
429: RateLimitError,
}


def to_api_error(response: httpx.Response) -> APIStatusError:
"""Translate a non-success ``httpx.Response`` into :class:`APIStatusError`,
extracting the backend error envelope's ``message`` when present."""
"""Translate a non-success ``httpx.Response`` into the most specific
:class:`APIStatusError` subclass available, extracting the backend error
envelope's ``message`` when present."""
try:
body: Any = response.json()
except ValueError:
Expand All @@ -40,4 +74,11 @@ def to_api_error(response: httpx.Response) -> APIStatusError:
if isinstance(error, dict)
else f"Gumloop API returned HTTP {response.status_code}"
)
return APIStatusError(message, status_code=response.status_code, body=body)
cls: type[APIStatusError]
if response.status_code in _STATUS_MAP:
cls = _STATUS_MAP[response.status_code]
elif response.status_code >= 500:
cls = ServerError
else:
cls = APIStatusError
return cls(message, status_code=response.status_code, body=body)
Loading