Skip to content

Commit 6fb87ff

Browse files
committed
Address review feedback: explicit options, exercise retries, fix mypy
Per @Fokko + @rambleraptor: drop the urllib3 retry-dict pass-through and expose only three explicit knobs on the connection block (timeout, retries, backoff-factor). Hard-code the retry policy (status_forcelist of transient codes; allowed_methods of idempotent verbs) so users cannot misconfigure e.g. raise_on_status=False and silently swallow 4xx errors. Per @rambleraptor: add a test that exercises the retry path end-to-end by spinning up a loopback HTTP server that returns three 503s then a 200, verifying the catalog makes all four attempts. requests_mock can't be used here because it replaces the HTTPAdapter and bypasses retry logic. Fix the three mypy errors flagged by CI: - _RetryTimeoutHTTPAdapter.send now matches HTTPAdapter.send's full signature instead of (request, **kwargs). - Test's set(adapter.max_retries.allowed_methods) now guards the Collection[str] | None type. Signed-off-by: rahulsmahadev <rahul.mahadev@databricks.com>
1 parent 9d85042 commit 6fb87ff

3 files changed

Lines changed: 134 additions & 40 deletions

File tree

mkdocs/docs/configuration.md

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -351,27 +351,28 @@ catalog:
351351
#### Retry and timeout
352352

353353
The REST Catalog uses `requests` with no retries and no timeout by default, so transient
354-
5xx/network failures bubble up immediately and slow servers can hang the client indefinitely.
354+
5xx / network failures bubble up immediately and slow servers can hang the client indefinitely.
355355
Set a `connection:` block on the catalog to opt in to a per-request timeout and a retry policy.
356-
Both keys are optional; when neither is set, the default `requests` behavior is preserved.
356+
Every key is optional; when none are set, the default `requests` behavior is preserved.
357357

358358
```yaml
359359
catalog:
360360
default:
361361
uri: http://rest-catalog/ws/
362362
connection:
363-
timeout: 60 # seconds, applied to every HTTP call
364-
retry:
365-
total: 5
366-
backoff_factor: 1.0
367-
status_forcelist: [429, 500, 502, 503, 504]
368-
allowed_methods: [GET, HEAD, OPTIONS]
369-
```
370-
371-
| Key | Example | Description |
372-
| ---------------------------- | ------------------------------------ | ------------------------------------------------------------------------------------------------------ |
373-
| connection.timeout | 60 | Per-request timeout in seconds. Must be a positive number. |
374-
| connection.retry | `{total: 5, backoff_factor: 1.0}` | Mapping passed verbatim as kwargs to [`urllib3.util.retry.Retry`](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry). |
363+
timeout: 60 # seconds, applied to every HTTP call
364+
retries: 5 # number of retry attempts on transient failures
365+
backoff-factor: 1.0 # exponential backoff between retries
366+
```
367+
368+
| Key | Example | Description |
369+
| ---------------------------- | -------- | ------------------------------------------------------------------------------------------------------------------ |
370+
| connection.timeout | 60 | Per-request timeout in seconds. Must be a positive number. |
371+
| connection.retries | 5 | Number of retry attempts for transient failures. Must be non-negative. |
372+
| connection.backoff-factor | 1.0 | Backoff factor between retry attempts. Must be non-negative. See [`urllib3` Retry docs](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry) for the formula. |
373+
374+
Retries are applied to idempotent methods only (`GET`, `HEAD`, `OPTIONS`) and to the
375+
transient HTTP status codes `429`, `500`, `502`, `503`, `504`. Other failures are not retried.
375376

376377
#### Headers in REST Catalog
377378

pyiceberg/catalog/rest/__init__.py

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from __future__ import annotations
1818

1919
from collections import deque
20+
from collections.abc import MutableMapping
2021
from enum import Enum
2122
from typing import (
2223
TYPE_CHECKING,
@@ -259,7 +260,12 @@ class ScanPlanningMode(Enum):
259260
SIGV4_MAX_RETRIES_DEFAULT = 10
260261
CONNECTION = "connection"
261262
CONNECTION_TIMEOUT = "timeout"
262-
CONNECTION_RETRY = "retry"
263+
CONNECTION_RETRIES = "retries"
264+
CONNECTION_BACKOFF_FACTOR = "backoff-factor"
265+
# Hard-coded internally so users cannot misconfigure the retry policy
266+
# (e.g. setting raise_on_status=False would swallow 4xx errors silently).
267+
_CONNECTION_RETRY_STATUS_FORCELIST = (429, 500, 502, 503, 504)
268+
_CONNECTION_RETRY_ALLOWED_METHODS = frozenset({"GET", "HEAD", "OPTIONS"})
263269
EMPTY_BODY_SHA256: str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
264270
OAUTH2_SERVER_URI = "oauth2-server-uri"
265271
SNAPSHOT_LOADING_MODE = "snapshot-loading-mode"
@@ -412,10 +418,18 @@ def __init__(self, timeout: float | None = None, max_retries: Retry | int | None
412418
else:
413419
super().__init__()
414420

415-
def send(self, request: PreparedRequest, **kwargs: Any) -> Response:
416-
if kwargs.get("timeout") is None and self._timeout is not None:
417-
kwargs["timeout"] = self._timeout
418-
return super().send(request, **kwargs)
421+
def send(
422+
self,
423+
request: PreparedRequest,
424+
stream: bool = False,
425+
timeout: None | float | tuple[float, float] | tuple[float, None] = None,
426+
verify: bool | str = True,
427+
cert: None | bytes | str | tuple[bytes | str, bytes | str] = None,
428+
proxies: MutableMapping[str, str] | None = None,
429+
) -> Response:
430+
if timeout is None:
431+
timeout = self._timeout
432+
return super().send(request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies)
419433

420434

421435
def _create_connection_adapter(properties: Properties) -> _RetryTimeoutHTTPAdapter | None:
@@ -439,19 +453,37 @@ def _create_connection_adapter(properties: Properties) -> _RetryTimeoutHTTPAdapt
439453
if timeout <= 0:
440454
raise ValueError(f"`{CONNECTION}.{CONNECTION_TIMEOUT}` must be a positive number, got: {timeout}")
441455

442-
retry: Retry | None = None
443-
if (retry_config := connection_config.get(CONNECTION_RETRY)) is not None:
444-
if not isinstance(retry_config, dict):
445-
raise ValueError(f"`{CONNECTION}.{CONNECTION_RETRY}` must be a mapping, got: {type(retry_config).__name__}")
456+
retries: int | None = None
457+
if (raw_retries := connection_config.get(CONNECTION_RETRIES)) is not None:
458+
try:
459+
retries = int(raw_retries)
460+
except (TypeError, ValueError) as e:
461+
raise ValueError(f"`{CONNECTION}.{CONNECTION_RETRIES}` must be an integer, got: {raw_retries!r}") from e
462+
if retries < 0:
463+
raise ValueError(f"`{CONNECTION}.{CONNECTION_RETRIES}` must be non-negative, got: {retries}")
464+
465+
backoff_factor: float | None = None
466+
if (raw_backoff := connection_config.get(CONNECTION_BACKOFF_FACTOR)) is not None:
446467
try:
447-
retry = Retry(**retry_config)
448-
except TypeError as e:
449-
raise ValueError(f"Invalid `{CONNECTION}.{CONNECTION_RETRY}` configuration: {e}") from e
468+
backoff_factor = float(raw_backoff)
469+
except (TypeError, ValueError) as e:
470+
raise ValueError(f"`{CONNECTION}.{CONNECTION_BACKOFF_FACTOR}` must be a number, got: {raw_backoff!r}") from e
471+
if backoff_factor < 0:
472+
raise ValueError(f"`{CONNECTION}.{CONNECTION_BACKOFF_FACTOR}` must be non-negative, got: {backoff_factor}")
473+
474+
max_retries: Retry | None = None
475+
if retries is not None or backoff_factor is not None:
476+
max_retries = Retry(
477+
total=retries if retries is not None else 0,
478+
backoff_factor=backoff_factor if backoff_factor is not None else 0,
479+
status_forcelist=list(_CONNECTION_RETRY_STATUS_FORCELIST),
480+
allowed_methods=_CONNECTION_RETRY_ALLOWED_METHODS,
481+
)
450482

451-
if timeout is None and retry is None:
483+
if timeout is None and max_retries is None:
452484
return None
453485

454-
return _RetryTimeoutHTTPAdapter(timeout=timeout, max_retries=retry)
486+
return _RetryTimeoutHTTPAdapter(timeout=timeout, max_retries=max_retries)
455487

456488

457489
class RestCatalog(Catalog):

tests/catalog/test_rest.py

Lines changed: 73 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
from pyiceberg.catalog import PropertiesUpdateSummary, load_catalog
3434
from pyiceberg.catalog.rest import (
3535
CONNECTION,
36-
CONNECTION_RETRY,
36+
CONNECTION_BACKOFF_FACTOR,
37+
CONNECTION_RETRIES,
3738
CONNECTION_TIMEOUT,
3839
DEFAULT_ENDPOINTS,
3940
EMPTY_BODY_SHA256,
@@ -2029,18 +2030,14 @@ def test_session_without_connection_config_uses_default_adapter(rest_mock: Mocke
20292030
assert not isinstance(adapter, _RetryTimeoutHTTPAdapter)
20302031

20312032

2032-
def test_session_with_connection_timeout_and_retry(rest_mock: Mocker) -> None:
2033+
def test_session_with_connection_timeout_and_retries(rest_mock: Mocker) -> None:
20332034
catalog_properties = {
20342035
"uri": TEST_URI,
20352036
"token": TEST_TOKEN,
20362037
CONNECTION: {
20372038
CONNECTION_TIMEOUT: 60,
2038-
CONNECTION_RETRY: {
2039-
"total": 5,
2040-
"backoff_factor": 1.0,
2041-
"status_forcelist": [429, 500, 502, 503, 504],
2042-
"allowed_methods": ["GET", "HEAD", "OPTIONS"],
2043-
},
2039+
CONNECTION_RETRIES: 5,
2040+
CONNECTION_BACKOFF_FACTOR: 1.0,
20442041
},
20452042
}
20462043
catalog = RestCatalog("rest", **catalog_properties) # type: ignore
@@ -2052,8 +2049,10 @@ def test_session_with_connection_timeout_and_retry(rest_mock: Mocker) -> None:
20522049
assert https_adapter._timeout == 60.0
20532050
assert https_adapter.max_retries.total == 5
20542051
assert https_adapter.max_retries.backoff_factor == 1.0
2052+
# Internal retry policy: transient codes and idempotent methods only.
20552053
assert https_adapter.max_retries.status_forcelist == [429, 500, 502, 503, 504]
2056-
assert set(https_adapter.max_retries.allowed_methods) == {"GET", "HEAD", "OPTIONS"}
2054+
allowed_methods = https_adapter.max_retries.allowed_methods or frozenset()
2055+
assert set(allowed_methods) == {"GET", "HEAD", "OPTIONS"}
20572056

20582057

20592058
def test_session_with_connection_timeout_only(rest_mock: Mocker) -> None:
@@ -2066,6 +2065,68 @@ def test_session_with_connection_timeout_only(rest_mock: Mocker) -> None:
20662065
adapter = catalog._session.adapters["https://"]
20672066
assert isinstance(adapter, _RetryTimeoutHTTPAdapter)
20682067
assert adapter._timeout == 30.0
2068+
# No retry options set, so no Retry object is configured.
2069+
assert adapter.max_retries.total == 0
2070+
2071+
2072+
def test_session_retries_on_transient_5xx_then_succeeds() -> None:
2073+
"""Three real 503 responses followed by a 200; the catalog should make all four attempts.
2074+
2075+
`requests_mock` would replace our HTTPAdapter, bypassing the retry logic we want to exercise,
2076+
so this test stands up an actual `http.server` on a loopback port instead.
2077+
"""
2078+
import json
2079+
import threading
2080+
from http.server import BaseHTTPRequestHandler, HTTPServer
2081+
2082+
state = {"namespace_calls": 0}
2083+
config_body = json.dumps(
2084+
{"defaults": {}, "overrides": {}, "endpoints": [str(endpoint) for endpoint in TEST_SUPPORTED_ENDPOINTS]}
2085+
).encode()
2086+
2087+
class _Handler(BaseHTTPRequestHandler):
2088+
def do_GET(self) -> None:
2089+
if self.path.endswith("/v1/config"):
2090+
self._respond(200, config_body)
2091+
elif self.path.endswith("/v1/namespaces"):
2092+
state["namespace_calls"] += 1
2093+
if state["namespace_calls"] <= 3:
2094+
self._respond(503, b"")
2095+
else:
2096+
self._respond(200, json.dumps({"namespaces": [["foo"]]}).encode())
2097+
else:
2098+
self._respond(404, b"")
2099+
2100+
def _respond(self, status: int, body: bytes) -> None:
2101+
self.send_response(status)
2102+
self.send_header("Content-Type", "application/json")
2103+
self.send_header("Content-Length", str(len(body)))
2104+
self.end_headers()
2105+
if body:
2106+
self.wfile.write(body)
2107+
2108+
def log_message(self, format: str, *args: Any) -> None: # silence default access logs
2109+
pass
2110+
2111+
server = HTTPServer(("127.0.0.1", 0), _Handler)
2112+
port = server.server_address[1]
2113+
server_thread = threading.Thread(target=server.serve_forever, daemon=True)
2114+
server_thread.start()
2115+
try:
2116+
catalog = RestCatalog(
2117+
"rest",
2118+
**{ # type: ignore
2119+
"uri": f"http://127.0.0.1:{port}/",
2120+
"token": TEST_TOKEN,
2121+
# backoff-factor=0 keeps the test fast; retries=3 covers three 503s + the eventual 200.
2122+
CONNECTION: {CONNECTION_RETRIES: 3, CONNECTION_BACKOFF_FACTOR: 0},
2123+
},
2124+
)
2125+
assert catalog.list_namespaces() == [("foo",)]
2126+
assert state["namespace_calls"] == 4
2127+
finally:
2128+
server.shutdown()
2129+
server.server_close()
20692130

20702131

20712132
def test_session_with_invalid_connection_timeout_raises(rest_mock: Mocker) -> None:
@@ -2078,13 +2139,13 @@ def test_session_with_invalid_connection_timeout_raises(rest_mock: Mocker) -> No
20782139
RestCatalog("rest", **catalog_properties) # type: ignore
20792140

20802141

2081-
def test_session_with_invalid_connection_retry_kwarg_raises(rest_mock: Mocker) -> None:
2142+
def test_session_with_invalid_connection_retries_raises(rest_mock: Mocker) -> None:
20822143
catalog_properties = {
20832144
"uri": TEST_URI,
20842145
"token": TEST_TOKEN,
2085-
CONNECTION: {CONNECTION_RETRY: {"bogus_kwarg": 1}},
2146+
CONNECTION: {CONNECTION_RETRIES: -1},
20862147
}
2087-
with pytest.raises(ValueError, match="Invalid `connection.retry` configuration"):
2148+
with pytest.raises(ValueError, match="`connection.retries` must be non-negative"):
20882149
RestCatalog("rest", **catalog_properties) # type: ignore
20892150

20902151

0 commit comments

Comments
 (0)