Skip to content

Commit 7ff9522

Browse files
authored
fix(media): retry failed uploads (#1540)
1 parent 11d7ab6 commit 7ff9522

File tree

2 files changed

+109
-6
lines changed

2 files changed

+109
-6
lines changed

langfuse/_task_manager/media_manager.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -252,13 +252,36 @@ def _process_upload_media_job(
252252
headers["x-ms-blob-type"] = "BlockBlob"
253253
headers["x-amz-checksum-sha256"] = data["content_sha256_hash"]
254254

255+
def _upload_with_status_check() -> httpx.Response:
256+
response = self._httpx_client.put(
257+
upload_url,
258+
headers=headers,
259+
content=data["content_bytes"],
260+
)
261+
response.raise_for_status()
262+
263+
return response
264+
255265
upload_start_time = time.time()
256-
upload_response = self._request_with_backoff(
257-
self._httpx_client.put,
258-
upload_url,
259-
headers=headers,
260-
content=data["content_bytes"],
261-
)
266+
267+
try:
268+
upload_response = self._request_with_backoff(_upload_with_status_check)
269+
except httpx.HTTPStatusError as e:
270+
upload_time_ms = int((time.time() - upload_start_time) * 1000)
271+
failed_response = e.response
272+
273+
if failed_response is not None:
274+
self._request_with_backoff(
275+
self._api_client.media.patch,
276+
media_id=data["media_id"],
277+
uploaded_at=_get_timestamp(),
278+
upload_http_status=failed_response.status_code,
279+
upload_http_error=failed_response.text,
280+
upload_time_ms=upload_time_ms,
281+
)
282+
283+
raise
284+
262285
upload_time_ms = int((time.time() - upload_start_time) * 1000)
263286

264287
self._request_with_backoff(

tests/test_media_manager.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
from queue import Queue
2+
from types import SimpleNamespace
3+
from unittest.mock import Mock
4+
5+
import httpx
6+
import pytest
7+
8+
from langfuse._task_manager.media_manager import MediaManager
9+
10+
11+
def _upload_response(status_code: int, text: str = "") -> httpx.Response:
12+
request = httpx.Request("PUT", "https://example.com/upload")
13+
return httpx.Response(status_code=status_code, request=request, text=text)
14+
15+
16+
def _upload_job() -> dict:
17+
return {
18+
"media_id": "media-id",
19+
"content_bytes": b"payload",
20+
"content_type": "image/jpeg",
21+
"content_length": 7,
22+
"content_sha256_hash": "sha256hash",
23+
"trace_id": "trace-id",
24+
"observation_id": None,
25+
"field": "input",
26+
}
27+
28+
29+
def test_media_upload_retries_on_retryable_http_status():
30+
media_api = Mock()
31+
media_api.get_upload_url.return_value = SimpleNamespace(
32+
upload_url="https://example.com/upload",
33+
media_id="media-id",
34+
)
35+
media_api.patch.return_value = None
36+
37+
httpx_client = Mock()
38+
httpx_client.put.side_effect = [
39+
_upload_response(503, "temporary failure"),
40+
_upload_response(200, "ok"),
41+
]
42+
43+
manager = MediaManager(
44+
api_client=SimpleNamespace(media=media_api),
45+
httpx_client=httpx_client,
46+
media_upload_queue=Queue(),
47+
max_retries=3,
48+
)
49+
50+
manager._process_upload_media_job(data=_upload_job())
51+
52+
assert httpx_client.put.call_count == 2
53+
media_api.patch.assert_called_once()
54+
assert media_api.patch.call_args.kwargs["upload_http_status"] == 200
55+
56+
57+
def test_media_upload_gives_up_on_non_retryable_http_status():
58+
media_api = Mock()
59+
media_api.get_upload_url.return_value = SimpleNamespace(
60+
upload_url="https://example.com/upload",
61+
media_id="media-id",
62+
)
63+
media_api.patch.return_value = None
64+
65+
httpx_client = Mock()
66+
httpx_client.put.return_value = _upload_response(403, "forbidden")
67+
68+
manager = MediaManager(
69+
api_client=SimpleNamespace(media=media_api),
70+
httpx_client=httpx_client,
71+
media_upload_queue=Queue(),
72+
max_retries=3,
73+
)
74+
75+
with pytest.raises(httpx.HTTPStatusError):
76+
manager._process_upload_media_job(data=_upload_job())
77+
78+
assert httpx_client.put.call_count == 1
79+
media_api.patch.assert_called_once()
80+
assert media_api.patch.call_args.kwargs["upload_http_status"] == 403

0 commit comments

Comments
 (0)