Skip to content

Commit 9d85042

Browse files
committed
REST: Add retry and timeout configuration for REST catalog
Closes #2772. The REST Catalog uses requests with no retries and no timeout by default, so transient 5xx/network failures bubble up immediately and slow servers can hang the client indefinitely (e.g. a Polaris instance returning 504 from a proxy). Add an optional connection: block on the catalog properties: catalog: default: uri: http://rest-catalog/ws/ connection: timeout: 60 retry: total: 5 backoff_factor: 1.0 status_forcelist: [429, 500, 502, 503, 504] allowed_methods: [GET, HEAD, OPTIONS] connection.retry is passed verbatim to urllib3.util.retry.Retry. Both keys are optional and opt-in: when neither is set the default requests behavior is preserved. Signed-off-by: rahulsmahadev <rahul.mahadev@databricks.com>
1 parent d339391 commit 9d85042

3 files changed

Lines changed: 163 additions & 3 deletions

File tree

mkdocs/docs/configuration.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,31 @@ catalog:
348348
| snapshot-loading-mode | refs | The snapshots to return in the body of the metadata. Setting the value to `all` would return the full set of snapshots currently valid for the table. Setting the value to `refs` would load all snapshots referenced by branches or tags. |
349349
| `header.X-Iceberg-Access-Delegation` | `vended-credentials` | Signal to the server that the client supports delegated access via a comma-separated list of access mechanisms. The server may choose to supply access via any or none of the requested mechanisms. When using `vended-credentials`, the server provides temporary credentials to the client. When using `remote-signing`, the server signs requests on behalf of the client. (default: `vended-credentials`) |
350350

351+
#### Retry and timeout
352+
353+
The REST Catalog uses `requests` with no retries and no timeout by default, so transient
354+
5xx/network failures bubble up immediately and slow servers can hang the client indefinitely.
355+
Set a `connection:` block on the catalog to opt in to a per-request timeout and a retry policy.
356+
Both keys are optional; when neither is set, the default `requests` behavior is preserved.
357+
358+
```yaml
359+
catalog:
360+
default:
361+
uri: http://rest-catalog/ws/
362+
connection:
363+
timeout: 60 # seconds, applied to every HTTP call
364+
retry:
365+
total: 5
366+
backoff_factor: 1.0
367+
status_forcelist: [429, 500, 502, 503, 504]
368+
allowed_methods: [GET, HEAD, OPTIONS]
369+
```
370+
371+
| Key | Example | Description |
372+
| ---------------------------- | ------------------------------------ | ------------------------------------------------------------------------------------------------------ |
373+
| connection.timeout | 60 | Per-request timeout in seconds. Must be a positive number. |
374+
| connection.retry | `{total: 5, backoff_factor: 1.0}` | Mapping passed verbatim as kwargs to [`urllib3.util.retry.Retry`](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry). |
375+
351376
#### Headers in REST Catalog
352377

353378
To configure custom headers in REST Catalog, include them in the catalog properties with `header.<Header-Name>`. This

pyiceberg/catalog/rest/__init__.py

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
from urllib.parse import quote, unquote
2626

2727
from pydantic import ConfigDict, Field, TypeAdapter, field_validator
28-
from requests import HTTPError, Session
28+
from requests import HTTPError, PreparedRequest, Response, Session
29+
from requests.adapters import HTTPAdapter
2930
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt
3031
from typing_extensions import override
32+
from urllib3.util.retry import Retry
3133

3234
from pyiceberg import __version__
3335
from pyiceberg.catalog import BOTOCORE_SESSION, TOKEN, URI, WAREHOUSE_LOCATION, Catalog, PropertiesUpdateSummary
@@ -255,6 +257,9 @@ class ScanPlanningMode(Enum):
255257
SIGV4_SERVICE = "rest.signing-name"
256258
SIGV4_MAX_RETRIES = "rest.sigv4.max-retries"
257259
SIGV4_MAX_RETRIES_DEFAULT = 10
260+
CONNECTION = "connection"
261+
CONNECTION_TIMEOUT = "timeout"
262+
CONNECTION_RETRY = "retry"
258263
EMPTY_BODY_SHA256: str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
259264
OAUTH2_SERVER_URI = "oauth2-server-uri"
260265
SNAPSHOT_LOADING_MODE = "snapshot-loading-mode"
@@ -392,6 +397,63 @@ class ListViewsResponse(IcebergBaseModel):
392397
_PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse)
393398

394399

400+
class _RetryTimeoutHTTPAdapter(HTTPAdapter):
401+
"""HTTPAdapter that applies a default per-request timeout.
402+
403+
requests does not provide a way to set a default timeout on a Session;
404+
without this adapter, every call would have to thread `timeout=` through.
405+
The adapter applies `self._timeout` whenever a per-call timeout is not set.
406+
"""
407+
408+
def __init__(self, timeout: float | None = None, max_retries: Retry | int | None = None) -> None:
409+
self._timeout = timeout
410+
if max_retries is not None:
411+
super().__init__(max_retries=max_retries)
412+
else:
413+
super().__init__()
414+
415+
def send(self, request: PreparedRequest, **kwargs: Any) -> Response:
416+
if kwargs.get("timeout") is None and self._timeout is not None:
417+
kwargs["timeout"] = self._timeout
418+
return super().send(request, **kwargs)
419+
420+
421+
def _create_connection_adapter(properties: Properties) -> _RetryTimeoutHTTPAdapter | None:
422+
"""Build a connection adapter from the optional `connection.*` properties.
423+
424+
Returns None when no `connection` block is supplied, leaving the default
425+
Session behavior unchanged. Raises ValueError on invalid input.
426+
"""
427+
connection_config = properties.get(CONNECTION)
428+
if not connection_config:
429+
return None
430+
if not isinstance(connection_config, dict):
431+
raise ValueError(f"`{CONNECTION}` must be a mapping, got: {type(connection_config).__name__}")
432+
433+
timeout: float | None = None
434+
if (raw_timeout := connection_config.get(CONNECTION_TIMEOUT)) is not None:
435+
try:
436+
timeout = float(raw_timeout)
437+
except (TypeError, ValueError) as e:
438+
raise ValueError(f"`{CONNECTION}.{CONNECTION_TIMEOUT}` must be a number, got: {raw_timeout!r}") from e
439+
if timeout <= 0:
440+
raise ValueError(f"`{CONNECTION}.{CONNECTION_TIMEOUT}` must be a positive number, got: {timeout}")
441+
442+
retry: Retry | None = None
443+
if (retry_config := connection_config.get(CONNECTION_RETRY)) is not None:
444+
if not isinstance(retry_config, dict):
445+
raise ValueError(f"`{CONNECTION}.{CONNECTION_RETRY}` must be a mapping, got: {type(retry_config).__name__}")
446+
try:
447+
retry = Retry(**retry_config)
448+
except TypeError as e:
449+
raise ValueError(f"Invalid `{CONNECTION}.{CONNECTION_RETRY}` configuration: {e}") from e
450+
451+
if timeout is None and retry is None:
452+
return None
453+
454+
return _RetryTimeoutHTTPAdapter(timeout=timeout, max_retries=retry)
455+
456+
395457
class RestCatalog(Catalog):
396458
uri: str
397459
_session: Session
@@ -418,6 +480,12 @@ def _create_session(self) -> Session:
418480
"""Create a request session with provided catalog configuration."""
419481
session = Session()
420482

483+
# Mount the retry/timeout adapter when `connection.*` properties are set.
484+
# SigV4's adapter mounted below at `self.uri` is a longer prefix and still wins for that host.
485+
if (connection_adapter := _create_connection_adapter(self.properties)) is not None:
486+
session.mount("http://", connection_adapter)
487+
session.mount("https://", connection_adapter)
488+
421489
# Set HTTP headers
422490
self._config_headers(session)
423491

@@ -763,8 +831,6 @@ def _init_sigv4(self, session: Session) -> None:
763831
import boto3
764832
from botocore.auth import SigV4Auth
765833
from botocore.awsrequest import AWSRequest
766-
from requests import PreparedRequest
767-
from requests.adapters import HTTPAdapter
768834

769835
class SigV4Adapter(HTTPAdapter):
770836
def __init__(self, **properties: str):

tests/catalog/test_rest.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
import pyiceberg
3333
from pyiceberg.catalog import PropertiesUpdateSummary, load_catalog
3434
from pyiceberg.catalog.rest import (
35+
CONNECTION,
36+
CONNECTION_RETRY,
37+
CONNECTION_TIMEOUT,
3538
DEFAULT_ENDPOINTS,
3639
EMPTY_BODY_SHA256,
3740
OAUTH2_SERVER_URI,
@@ -43,6 +46,7 @@
4346
HttpMethod,
4447
RestCatalog,
4548
ScanPlanningMode,
49+
_RetryTimeoutHTTPAdapter,
4650
)
4751
from pyiceberg.exceptions import (
4852
AuthorizationExpiredError,
@@ -2019,6 +2023,71 @@ def test_request_session_with_ssl_client_cert() -> None:
20192023
assert "Could not find the TLS certificate file, invalid path: path_to_client_cert" in str(e.value)
20202024

20212025

2026+
def test_session_without_connection_config_uses_default_adapter(rest_mock: Mocker) -> None:
2027+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
2028+
for adapter in catalog._session.adapters.values():
2029+
assert not isinstance(adapter, _RetryTimeoutHTTPAdapter)
2030+
2031+
2032+
def test_session_with_connection_timeout_and_retry(rest_mock: Mocker) -> None:
2033+
catalog_properties = {
2034+
"uri": TEST_URI,
2035+
"token": TEST_TOKEN,
2036+
CONNECTION: {
2037+
CONNECTION_TIMEOUT: 60,
2038+
CONNECTION_RETRY: {
2039+
"total": 5,
2040+
"backoff_factor": 1.0,
2041+
"status_forcelist": [429, 500, 502, 503, 504],
2042+
"allowed_methods": ["GET", "HEAD", "OPTIONS"],
2043+
},
2044+
},
2045+
}
2046+
catalog = RestCatalog("rest", **catalog_properties) # type: ignore
2047+
2048+
https_adapter = catalog._session.adapters["https://"]
2049+
http_adapter = catalog._session.adapters["http://"]
2050+
assert isinstance(https_adapter, _RetryTimeoutHTTPAdapter)
2051+
assert https_adapter is http_adapter
2052+
assert https_adapter._timeout == 60.0
2053+
assert https_adapter.max_retries.total == 5
2054+
assert https_adapter.max_retries.backoff_factor == 1.0
2055+
assert https_adapter.max_retries.status_forcelist == [429, 500, 502, 503, 504]
2056+
assert set(https_adapter.max_retries.allowed_methods) == {"GET", "HEAD", "OPTIONS"}
2057+
2058+
2059+
def test_session_with_connection_timeout_only(rest_mock: Mocker) -> None:
2060+
catalog_properties = {
2061+
"uri": TEST_URI,
2062+
"token": TEST_TOKEN,
2063+
CONNECTION: {CONNECTION_TIMEOUT: "30"},
2064+
}
2065+
catalog = RestCatalog("rest", **catalog_properties) # type: ignore
2066+
adapter = catalog._session.adapters["https://"]
2067+
assert isinstance(adapter, _RetryTimeoutHTTPAdapter)
2068+
assert adapter._timeout == 30.0
2069+
2070+
2071+
def test_session_with_invalid_connection_timeout_raises(rest_mock: Mocker) -> None:
2072+
catalog_properties = {
2073+
"uri": TEST_URI,
2074+
"token": TEST_TOKEN,
2075+
CONNECTION: {CONNECTION_TIMEOUT: -1},
2076+
}
2077+
with pytest.raises(ValueError, match="`connection.timeout` must be a positive number"):
2078+
RestCatalog("rest", **catalog_properties) # type: ignore
2079+
2080+
2081+
def test_session_with_invalid_connection_retry_kwarg_raises(rest_mock: Mocker) -> None:
2082+
catalog_properties = {
2083+
"uri": TEST_URI,
2084+
"token": TEST_TOKEN,
2085+
CONNECTION: {CONNECTION_RETRY: {"bogus_kwarg": 1}},
2086+
}
2087+
with pytest.raises(ValueError, match="Invalid `connection.retry` configuration"):
2088+
RestCatalog("rest", **catalog_properties) # type: ignore
2089+
2090+
20222091
def test_rest_catalog_with_basic_auth_type(rest_mock: Mocker) -> None:
20232092
# Given
20242093
rest_mock.get(

0 commit comments

Comments
 (0)