Skip to content

Commit c457603

Browse files
authored
Merge branch 'main' into deprecate-trace-updates
2 parents aa8f3b3 + cfd3ac1 commit c457603

18 files changed

Lines changed: 273 additions & 86 deletions

.github/workflows/ci.yml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@ jobs:
1919
runs-on: ubuntu-latest
2020
steps:
2121
- uses: actions/checkout@v3
22-
- uses: chartboost/ruff-action@v1
23-
with:
24-
args: check --config ci.ruff.toml
22+
- uses: astral-sh/ruff-action@v3
2523

2624
type-checking:
2725
runs-on: ubuntu-latest

.pre-commit-config.yaml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
repos:
22
- repo: https://github.com/astral-sh/ruff-pre-commit
3-
rev: v0.14.4
3+
rev: v0.15.2
44
hooks:
55
# Run the linter and fix
6-
- id: ruff
6+
- id: ruff-check
77
types_or: [python, pyi, jupyter]
8-
args: [--fix, --config=ci.ruff.toml]
8+
args: [--fix]
99

1010
# Run the formatter.
1111
- id: ruff-format
1212
types_or: [python, pyi, jupyter]
13-
args: [--config=ci.ruff.toml]
1413

1514
- repo: https://github.com/pre-commit/mirrors-mypy
1615
rev: v1.18.2

CLAUDE.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,6 @@ Environment variables (defined in `_client/environment_variables.py`):
114114
## Important Files
115115

116116
- `pyproject.toml`: Poetry configuration, dependencies, and tool settings
117-
- `ruff.toml`: Local development linting config (stricter)
118-
- `ci.ruff.toml`: CI linting config (more permissive)
119117
- `langfuse/version.py`: Version string (updated by CI release workflow)
120118

121119
## API Generation

ci.ruff.toml

Lines changed: 0 additions & 6 deletions
This file was deleted.

langfuse/_client/client.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
Prompt_Chat,
9696
Prompt_Text,
9797
ScoreBody,
98+
TraceBody,
9899
)
99100
from langfuse.batch_evaluation import (
100101
BatchEvaluationResult,
@@ -1839,6 +1840,39 @@ def create_score(
18391840
f"Error creating score: Failed to process score event for trace_id={trace_id}, name={name}. Error: {e}"
18401841
)
18411842

1843+
def _create_trace_tags_via_ingestion(
1844+
self,
1845+
*,
1846+
trace_id: str,
1847+
tags: List[str],
1848+
) -> None:
1849+
"""Private helper to enqueue trace tag updates via ingestion API events."""
1850+
if not self._tracing_enabled:
1851+
return
1852+
1853+
if len(tags) == 0:
1854+
return
1855+
1856+
try:
1857+
new_body = TraceBody(
1858+
id=trace_id,
1859+
tags=tags,
1860+
)
1861+
1862+
event = {
1863+
"id": self.create_trace_id(),
1864+
"type": "trace-create",
1865+
"timestamp": _get_timestamp(),
1866+
"body": new_body,
1867+
}
1868+
1869+
if self._resources is not None:
1870+
self._resources.add_trace_task(event)
1871+
except Exception as e:
1872+
langfuse_logger.exception(
1873+
f"Error updating trace tags: Failed to process trace update event for trace_id={trace_id}. Error: {e}"
1874+
)
1875+
18421876
@overload
18431877
def score_current_span(
18441878
self,
@@ -2866,8 +2900,10 @@ def run_batched_evaluation(
28662900
max_retries: int = 3,
28672901
evaluators: List[EvaluatorFunction],
28682902
composite_evaluator: Optional[CompositeEvaluatorFunction] = None,
2869-
max_concurrency: int = 50,
2903+
max_concurrency: int = 5,
28702904
metadata: Optional[Dict[str, Any]] = None,
2905+
_add_observation_scores_to_trace: bool = False,
2906+
_additional_trace_tags: Optional[List[str]] = None,
28712907
resume_from: Optional[BatchEvaluationResumeToken] = None,
28722908
verbose: bool = False,
28732909
) -> BatchEvaluationResult:
@@ -2909,7 +2945,7 @@ def run_batched_evaluation(
29092945
items matching the filter. Useful for testing or limiting evaluation runs.
29102946
Default: None (process all).
29112947
max_concurrency: Maximum number of items to evaluate concurrently. Controls
2912-
parallelism and resource usage. Default: 50.
2948+
parallelism and resource usage. Default: 5.
29132949
composite_evaluator: Optional function that creates a composite score from
29142950
item-level evaluations. Receives the original item and its evaluations,
29152951
returns a single Evaluation. Useful for weighted averages or combined metrics.
@@ -3078,6 +3114,8 @@ def composite_evaluator(*, item, evaluations):
30783114
max_concurrency=max_concurrency,
30793115
composite_evaluator=composite_evaluator,
30803116
metadata=metadata,
3117+
_add_observation_scores_to_trace=_add_observation_scores_to_trace,
3118+
_additional_trace_tags=_additional_trace_tags,
30813119
max_retries=max_retries,
30823120
verbose=verbose,
30833121
resume_from=resume_from,

langfuse/_client/resource_manager.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ def _initialize_instance(
254254
self._media_upload_queue: Queue[Any] = Queue(100_000)
255255
self._media_manager = MediaManager(
256256
api_client=self.api,
257+
httpx_client=self.httpx_client,
257258
media_upload_queue=self._media_upload_queue,
258259
max_retries=3,
259260
)
@@ -356,6 +357,29 @@ def add_score_task(self, event: dict, *, force_sample: bool = False) -> None:
356357

357358
return
358359

360+
def add_trace_task(
361+
self,
362+
event: dict,
363+
) -> None:
364+
try:
365+
langfuse_logger.debug(
366+
f"Trace: Enqueuing event type={event['type']} for trace_id={event['body'].id}"
367+
)
368+
self._score_ingestion_queue.put(event, block=False)
369+
370+
except Full:
371+
langfuse_logger.warning(
372+
"System overload: Trace ingestion queue has reached capacity (100,000 items). Trace update will be dropped. Consider increasing flush frequency or decreasing event volume."
373+
)
374+
375+
return
376+
except Exception as e:
377+
langfuse_logger.error(
378+
f"Unexpected error: Failed to process trace event. The trace update will be dropped. Error details: {e}"
379+
)
380+
381+
return
382+
359383
@property
360384
def tracer(self) -> Optional[Tracer]:
361385
return self._otel_tracer

langfuse/_task_manager/media_manager.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from typing import Any, Callable, Optional, TypeVar, cast
66

77
import backoff
8-
import requests
8+
import httpx
99
from typing_extensions import ParamSpec
1010

1111
from langfuse._client.environment_variables import LANGFUSE_MEDIA_UPLOAD_ENABLED
@@ -27,10 +27,12 @@ def __init__(
2727
self,
2828
*,
2929
api_client: LangfuseAPI,
30+
httpx_client: httpx.Client,
3031
media_upload_queue: Queue,
3132
max_retries: Optional[int] = 3,
3233
):
3334
self._api_client = api_client
35+
self._httpx_client = httpx_client
3436
self._queue = media_upload_queue
3537
self._max_retries = max_retries
3638
self._enabled = os.environ.get(
@@ -252,10 +254,10 @@ def _process_upload_media_job(
252254

253255
upload_start_time = time.time()
254256
upload_response = self._request_with_backoff(
255-
requests.put,
257+
self._httpx_client.put,
256258
upload_url,
257259
headers=headers,
258-
data=data["content_bytes"],
260+
content=data["content_bytes"],
259261
)
260262
upload_time_ms = int((time.time() - upload_start_time) * 1000)
261263

@@ -282,7 +284,7 @@ def _should_give_up(e: Exception) -> bool:
282284
and 400 <= e.status_code < 500
283285
and e.status_code != 429
284286
)
285-
if isinstance(e, requests.exceptions.RequestException):
287+
if isinstance(e, httpx.HTTPStatusError):
286288
return (
287289
e.response is not None
288290
and e.response.status_code < 500

0 commit comments

Comments
 (0)