Skip to content

Commit cfd3ac1

Browse files
authored
fix(media): reuse httpx client for requests (#1537)
1 parent 89730bf commit cfd3ac1

File tree

7 files changed

+88
-119
lines changed

7 files changed

+88
-119
lines changed

langfuse/_client/resource_manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ def _initialize_instance(
249249
self._media_upload_queue: Queue[Any] = Queue(100_000)
250250
self._media_manager = MediaManager(
251251
api_client=self.api,
252+
httpx_client=self.httpx_client,
252253
media_upload_queue=self._media_upload_queue,
253254
max_retries=3,
254255
)

langfuse/_task_manager/media_manager.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from typing import Any, Callable, Optional, TypeVar, cast
66

77
import backoff
8-
import requests
8+
import httpx
99
from typing_extensions import ParamSpec
1010

1111
from langfuse._client.environment_variables import LANGFUSE_MEDIA_UPLOAD_ENABLED
@@ -29,10 +29,12 @@ def __init__(
2929
self,
3030
*,
3131
api_client: FernLangfuse,
32+
httpx_client: httpx.Client,
3233
media_upload_queue: Queue,
3334
max_retries: Optional[int] = 3,
3435
):
3536
self._api_client = api_client
37+
self._httpx_client = httpx_client
3638
self._queue = media_upload_queue
3739
self._max_retries = max_retries
3840
self._enabled = os.environ.get(
@@ -256,10 +258,10 @@ def _process_upload_media_job(
256258

257259
upload_start_time = time.time()
258260
upload_response = self._request_with_backoff(
259-
requests.put,
261+
self._httpx_client.put,
260262
upload_url,
261263
headers=headers,
262-
data=data["content_bytes"],
264+
content=data["content_bytes"],
263265
)
264266
upload_time_ms = int((time.time() - upload_start_time) * 1000)
265267

@@ -288,7 +290,7 @@ def _should_give_up(e: Exception) -> bool:
288290
and 400 <= e.status_code < 500
289291
and e.status_code != 429
290292
)
291-
if isinstance(e, requests.exceptions.RequestException):
293+
if isinstance(e, httpx.HTTPStatusError):
292294
return (
293295
e.response is not None
294296
and e.response.status_code < 500

langfuse/media.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import re
88
from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, TypeVar, cast
99

10-
import requests
10+
import httpx
1111

1212
if TYPE_CHECKING:
1313
from langfuse._client.client import Langfuse
@@ -284,6 +284,11 @@ def traverse(obj: Any, depth: int) -> Any:
284284

285285
result = obj
286286
reference_string_to_media_content = {}
287+
httpx_client = (
288+
langfuse_client._resources.httpx_client
289+
if langfuse_client._resources is not None
290+
else None
291+
)
287292

288293
for reference_string in reference_string_matches:
289294
try:
@@ -293,11 +298,17 @@ def traverse(obj: Any, depth: int) -> Any:
293298
media_data = langfuse_client.api.media.get(
294299
parsed_media_reference["media_id"]
295300
)
296-
media_content = requests.get(
297-
media_data.url, timeout=content_fetch_timeout_seconds
301+
media_content = (
302+
httpx_client.get(
303+
media_data.url,
304+
timeout=content_fetch_timeout_seconds,
305+
)
306+
if httpx_client is not None
307+
else httpx.get(
308+
media_data.url, timeout=content_fetch_timeout_seconds
309+
)
298310
)
299-
if not media_content.ok:
300-
raise Exception("Failed to fetch media content")
311+
media_content.raise_for_status()
301312

302313
base64_media_content = base64.b64encode(
303314
media_content.content

0 commit comments

Comments
 (0)