Skip to content

Commit 50051ad

Browse files
Ma77Ballgithub-actions[bot]
authored andcommitted
fix(amber): add timeout and retry to dataset file-service requests (#5667)
### What changes were proposed in this PR? - Route `DatasetFileDocument`'s presigned-URL fetch and file download through a `requests.Session` configured with a `(5s connect, 10s read)` timeout, so a hung or unreachable file-service fails in bounded time instead of blocking the worker thread forever. The read timeout bounds inactivity *between bytes*, not the total download time, so large dataset files that stream steadily are unaffected; it only trips when the connection stalls for 10s with no data. - Mount a `urllib3` `Retry` policy on the session (3 retries, exponential backoff, retrying on connection errors and 5xx). Both calls are idempotent GETs, so the retry set is restricted to `GET`. - Translate network failures (connect/read timeouts and connection errors, including those surfaced after retries are exhausted) into `RuntimeError`, consistent with the module's existing failure handling, so callers get a uniform error contract instead of a raw `requests`/`urllib3` exception. ### Any related issues, documentation, discussions? Closes: #5666 ### How was this PR tested? - Added `pytest` coverage in `test_dataset_file_document.py` (26 tests): - asserts the `(connect, read)` timeout tuple is passed on both the presigned-URL request and the file download; - asserts the retry adapter is mounted for `http://` and `https://` with the expected policy (`total=3`, `connect=3`, `read=3`, `backoff_factor=0.5`, `status_forcelist={500,502,503,504}`, GET-only); - asserts a `ReadTimeout` / `ConnectionError` is wrapped in `RuntimeError` on both code paths. - `ruff check` and `ruff format --check` pass on the modified files. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF (backported from commit 86f865c)
1 parent e48f79e commit 50051ad

4 files changed

Lines changed: 160 additions & 17 deletions

File tree

amber/LICENSE-binary-python

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ Python packages:
228228
- pympler==1.1
229229
- python-dateutil==2.8.2
230230
- regex==2026.5.9
231-
- requests==2.34.2
231+
- requests==2.34.0
232232
- s3transfer==0.14.0
233233
- safetensors==0.8.0
234234
- tenacity==8.5.0

amber/requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,5 @@ SQLAlchemy==2.0.37
4848
pg8000==1.31.5
4949
pympler==1.1
5050
boto3==1.40.53
51+
requests==2.34.0
52+
urllib3==2.7.0

amber/src/main/python/pytexera/storage/dataset_file_document.py

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,38 @@
1919
import os
2020
import requests
2121
import urllib.parse
22+
from requests.adapters import HTTPAdapter
23+
from urllib3.util.retry import Retry
2224

2325

2426
class DatasetFileDocument:
27+
# (connect, read) timeout and retry settings for the file-service GETs below.
28+
# Read timeout bounds inactivity between bytes, not total download time.
29+
_CONNECT_TIMEOUT_SECONDS = 5
30+
_READ_TIMEOUT_SECONDS = 10
31+
_REQUEST_TIMEOUT = (_CONNECT_TIMEOUT_SECONDS, _READ_TIMEOUT_SECONDS)
32+
_MAX_RETRIES = 3
33+
_RETRY_BACKOFF_FACTOR = 0.5
34+
_RETRY_STATUS_FORCELIST = (500, 502, 503, 504)
35+
36+
@classmethod
37+
def _retry_session(cls) -> requests.Session:
38+
"""Returns a Session that retries GETs on connection errors and 5xx."""
39+
retry = Retry(
40+
total=cls._MAX_RETRIES,
41+
connect=cls._MAX_RETRIES,
42+
read=cls._MAX_RETRIES,
43+
backoff_factor=cls._RETRY_BACKOFF_FACTOR,
44+
status_forcelist=cls._RETRY_STATUS_FORCELIST,
45+
allowed_methods=frozenset({"GET"}),
46+
raise_on_status=False,
47+
)
48+
adapter = HTTPAdapter(max_retries=retry)
49+
session = requests.Session()
50+
session.mount("http://", adapter)
51+
session.mount("https://", adapter)
52+
return session
53+
2554
def __init__(self, file_path: str):
2655
"""
2756
Parses the file path into dataset metadata.
@@ -69,7 +98,18 @@ def get_presigned_url(self) -> str:
6998

7099
params = {"filePath": encoded_file_path}
71100

72-
response = requests.get(self.presign_endpoint, headers=headers, params=params)
101+
try:
102+
with self._retry_session() as session:
103+
response = session.get(
104+
self.presign_endpoint,
105+
headers=headers,
106+
params=params,
107+
timeout=self._REQUEST_TIMEOUT,
108+
)
109+
except requests.exceptions.RequestException as e:
110+
raise RuntimeError(
111+
f"Failed to get presigned URL: request failed: {e}"
112+
) from e
73113

74114
if response.status_code != 200:
75115
raise RuntimeError(
@@ -100,7 +140,13 @@ def read_file(self) -> io.BytesIO:
100140
:raises: RuntimeError if the retrieval fails.
101141
"""
102142
presigned_url = self.get_presigned_url()
103-
response = requests.get(presigned_url)
143+
try:
144+
with self._retry_session() as session:
145+
response = session.get(presigned_url, timeout=self._REQUEST_TIMEOUT)
146+
except requests.exceptions.RequestException as e:
147+
raise RuntimeError(
148+
f"Failed to retrieve file content: request failed: {e}"
149+
) from e
104150

105151
if response.status_code != 200:
106152
raise RuntimeError(

amber/src/test/python/pytexera/storage/test_dataset_file_document.py

Lines changed: 109 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
import io
1919

2020
import pytest
21+
import requests
2122
from unittest.mock import patch, MagicMock
2223

2324
from pytexera.storage.dataset_file_document import DatasetFileDocument
2425

25-
2626
DEFAULT_ENDPOINT = "http://localhost:9092/api/dataset/presign-download"
2727
CUSTOM_ENDPOINT = "https://example.test/api/presign"
2828

@@ -95,15 +95,19 @@ def _make_doc(self, monkeypatch, path="/bob@x.com/ds/v1/file.csv"):
9595

9696
def test_returns_presigned_url_field_from_json_body(self, monkeypatch):
9797
doc = self._make_doc(monkeypatch)
98-
with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get:
98+
with patch(
99+
"pytexera.storage.dataset_file_document.requests.Session.get"
100+
) as mock_get:
99101
mock_get.return_value = make_response(
100102
200, body={"presignedUrl": "https://signed.test/x"}
101103
)
102104
assert doc.get_presigned_url() == "https://signed.test/x"
103105

104106
def test_sends_bearer_authorization_header_with_jwt(self, monkeypatch):
105107
doc = self._make_doc(monkeypatch)
106-
with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get:
108+
with patch(
109+
"pytexera.storage.dataset_file_document.requests.Session.get"
110+
) as mock_get:
107111
mock_get.return_value = make_response(200, body={"presignedUrl": "u"})
108112
doc.get_presigned_url()
109113
_, kwargs = mock_get.call_args
@@ -113,7 +117,9 @@ def test_url_encodes_filepath_query_parameter(self, monkeypatch):
113117
# urllib.parse.quote keeps "/" as safe by default, but encodes "@"
114118
# and " " — pin both pieces so the contract is explicit.
115119
doc = self._make_doc(monkeypatch, path="/bob@x.com/ds/v1/data file.csv")
116-
with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get:
120+
with patch(
121+
"pytexera.storage.dataset_file_document.requests.Session.get"
122+
) as mock_get:
117123
mock_get.return_value = make_response(200, body={"presignedUrl": "u"})
118124
doc.get_presigned_url()
119125
_, kwargs = mock_get.call_args
@@ -124,29 +130,37 @@ def test_url_encodes_filepath_query_parameter(self, monkeypatch):
124130

125131
def test_calls_configured_endpoint(self, monkeypatch):
126132
doc = self._make_doc(monkeypatch)
127-
with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get:
133+
with patch(
134+
"pytexera.storage.dataset_file_document.requests.Session.get"
135+
) as mock_get:
128136
mock_get.return_value = make_response(200, body={"presignedUrl": "u"})
129137
doc.get_presigned_url()
130138
args, _ = mock_get.call_args
131139
assert args[0] == CUSTOM_ENDPOINT
132140

133141
def test_raises_runtime_error_with_status_and_body_on_failure(self, monkeypatch):
134142
doc = self._make_doc(monkeypatch)
135-
with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get:
143+
with patch(
144+
"pytexera.storage.dataset_file_document.requests.Session.get"
145+
) as mock_get:
136146
mock_get.return_value = make_response(403, body="forbidden")
137147
with pytest.raises(RuntimeError, match=r"403.*forbidden"):
138148
doc.get_presigned_url()
139149

140150
def test_raises_when_response_body_lacks_presigned_url_key(self, monkeypatch):
141151
doc = self._make_doc(monkeypatch)
142-
with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get:
152+
with patch(
153+
"pytexera.storage.dataset_file_document.requests.Session.get"
154+
) as mock_get:
143155
mock_get.return_value = make_response(200, body={"other": "value"})
144156
with pytest.raises(RuntimeError, match="'presignedUrl' missing"):
145157
doc.get_presigned_url()
146158

147159
def test_raises_when_response_body_is_not_valid_json(self, monkeypatch):
148160
doc = self._make_doc(monkeypatch)
149-
with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get:
161+
with patch(
162+
"pytexera.storage.dataset_file_document.requests.Session.get"
163+
) as mock_get:
150164
response = MagicMock()
151165
response.status_code = 200
152166
response.json.side_effect = ValueError("Expecting value")
@@ -157,14 +171,18 @@ def test_raises_when_response_body_is_not_valid_json(self, monkeypatch):
157171

158172
def test_raises_when_presigned_url_is_empty_string(self, monkeypatch):
159173
doc = self._make_doc(monkeypatch)
160-
with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get:
174+
with patch(
175+
"pytexera.storage.dataset_file_document.requests.Session.get"
176+
) as mock_get:
161177
mock_get.return_value = make_response(200, body={"presignedUrl": ""})
162178
with pytest.raises(RuntimeError, match="'presignedUrl' missing"):
163179
doc.get_presigned_url()
164180

165181
def test_raises_when_presigned_url_is_not_a_string(self, monkeypatch):
166182
doc = self._make_doc(monkeypatch)
167-
with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get:
183+
with patch(
184+
"pytexera.storage.dataset_file_document.requests.Session.get"
185+
) as mock_get:
168186
mock_get.return_value = make_response(200, body={"presignedUrl": None})
169187
with pytest.raises(RuntimeError, match="'presignedUrl' missing"):
170188
doc.get_presigned_url()
@@ -178,7 +196,9 @@ def _make_doc(self, monkeypatch):
178196

179197
def test_returns_bytesio_with_downloaded_content(self, monkeypatch):
180198
doc = self._make_doc(monkeypatch)
181-
with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get:
199+
with patch(
200+
"pytexera.storage.dataset_file_document.requests.Session.get"
201+
) as mock_get:
182202
mock_get.side_effect = [
183203
make_response(200, body={"presignedUrl": "https://signed.test/x"}),
184204
make_response(200, content=b"hello-bytes"),
@@ -189,14 +209,18 @@ def test_returns_bytesio_with_downloaded_content(self, monkeypatch):
189209

190210
def test_propagates_presigned_url_failure(self, monkeypatch):
191211
doc = self._make_doc(monkeypatch)
192-
with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get:
212+
with patch(
213+
"pytexera.storage.dataset_file_document.requests.Session.get"
214+
) as mock_get:
193215
mock_get.return_value = make_response(500, body="upstream down")
194216
with pytest.raises(RuntimeError, match=r"500.*upstream down"):
195217
doc.read_file()
196218

197219
def test_raises_runtime_error_when_download_fails(self, monkeypatch):
198220
doc = self._make_doc(monkeypatch)
199-
with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get:
221+
with patch(
222+
"pytexera.storage.dataset_file_document.requests.Session.get"
223+
) as mock_get:
200224
mock_get.side_effect = [
201225
make_response(200, body={"presignedUrl": "https://signed.test/x"}),
202226
make_response(404, body="missing"),
@@ -206,11 +230,82 @@ def test_raises_runtime_error_when_download_fails(self, monkeypatch):
206230

207231
def test_downloads_from_presigned_url_returned_by_first_call(self, monkeypatch):
208232
doc = self._make_doc(monkeypatch)
209-
with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get:
233+
with patch(
234+
"pytexera.storage.dataset_file_document.requests.Session.get"
235+
) as mock_get:
210236
mock_get.side_effect = [
211237
make_response(200, body={"presignedUrl": "https://signed.test/x"}),
212238
make_response(200, content=b""),
213239
]
214240
doc.read_file()
215241
second_call_args, _ = mock_get.call_args_list[1]
216242
assert second_call_args[0] == "https://signed.test/x"
243+
244+
245+
class TestTimeoutsAndRetries:
246+
def _make_doc(self, monkeypatch):
247+
monkeypatch.setenv("USER_JWT_TOKEN", "test-jwt-token")
248+
monkeypatch.setenv("FILE_SERVICE_GET_PRESIGNED_URL_ENDPOINT", CUSTOM_ENDPOINT)
249+
return DatasetFileDocument("/bob@x.com/ds/v1/file.csv")
250+
251+
def test_presigned_url_request_passes_request_timeout(self, monkeypatch):
252+
doc = self._make_doc(monkeypatch)
253+
with patch(
254+
"pytexera.storage.dataset_file_document.requests.Session.get"
255+
) as mock_get:
256+
mock_get.return_value = make_response(200, body={"presignedUrl": "u"})
257+
doc.get_presigned_url()
258+
_, kwargs = mock_get.call_args
259+
assert kwargs["timeout"] == DatasetFileDocument._REQUEST_TIMEOUT
260+
261+
def test_download_request_passes_request_timeout(self, monkeypatch):
262+
doc = self._make_doc(monkeypatch)
263+
with patch(
264+
"pytexera.storage.dataset_file_document.requests.Session.get"
265+
) as mock_get:
266+
mock_get.side_effect = [
267+
make_response(200, body={"presignedUrl": "https://signed.test/x"}),
268+
make_response(200, content=b"data"),
269+
]
270+
doc.read_file()
271+
_, download_kwargs = mock_get.call_args_list[1]
272+
assert download_kwargs["timeout"] == DatasetFileDocument._REQUEST_TIMEOUT
273+
274+
def test_session_mounts_retry_adapter_for_http_and_https(self):
275+
session = DatasetFileDocument._retry_session()
276+
try:
277+
for prefix in ("http://", "https://"):
278+
retry = session.get_adapter(prefix).max_retries
279+
assert retry.total == DatasetFileDocument._MAX_RETRIES
280+
assert retry.connect == DatasetFileDocument._MAX_RETRIES
281+
assert retry.read == DatasetFileDocument._MAX_RETRIES
282+
assert set(retry.status_forcelist) == set(
283+
DatasetFileDocument._RETRY_STATUS_FORCELIST
284+
)
285+
# Only idempotent GETs should be retried.
286+
assert retry.allowed_methods == frozenset({"GET"})
287+
finally:
288+
session.close()
289+
290+
def test_presigned_url_request_timeout_is_wrapped_in_runtime_error(
291+
self, monkeypatch
292+
):
293+
doc = self._make_doc(monkeypatch)
294+
with patch(
295+
"pytexera.storage.dataset_file_document.requests.Session.get"
296+
) as mock_get:
297+
mock_get.side_effect = requests.exceptions.ReadTimeout("timed out")
298+
with pytest.raises(RuntimeError, match="request failed"):
299+
doc.get_presigned_url()
300+
301+
def test_download_request_timeout_is_wrapped_in_runtime_error(self, monkeypatch):
302+
doc = self._make_doc(monkeypatch)
303+
with patch(
304+
"pytexera.storage.dataset_file_document.requests.Session.get"
305+
) as mock_get:
306+
mock_get.side_effect = [
307+
make_response(200, body={"presignedUrl": "https://signed.test/x"}),
308+
requests.exceptions.ConnectionError("connection reset"),
309+
]
310+
with pytest.raises(RuntimeError, match="Failed to retrieve file content"):
311+
doc.read_file()

0 commit comments

Comments
 (0)