Skip to content

Commit e32aeb2

Browse files
authored
fix(experiments): fix unstable local experiment IDs for local data (#1600)
1 parent a3f15c6 commit e32aeb2

File tree

3 files changed

+64
-8
lines changed

3 files changed

+64
-8
lines changed

langfuse/_client/client.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2427,6 +2427,7 @@ def run_experiment(
24272427
- run_name: The experiment run name. This is equal to the dataset run name if experiment was on Langfuse dataset.
24282428
- item_results: List of results for each processed item with outputs and evaluations
24292429
- run_evaluations: List of aggregate evaluation results for the entire run
2430+
- experiment_id: Stable identifier for the experiment run across all items
24302431
- dataset_run_id: ID of the dataset run (if using Langfuse datasets)
24312432
- dataset_run_url: Direct URL to view results in Langfuse UI (if applicable)
24322433
@@ -2577,6 +2578,8 @@ async def _run_experiment_async(
25772578
f"Starting experiment '{name}' run '{run_name}' with {len(data)} items"
25782579
)
25792580

2581+
shared_fallback_experiment_id = self._create_observation_id()
2582+
25802583
# Set up concurrency control
25812584
semaphore = asyncio.Semaphore(max_concurrency)
25822585

@@ -2588,6 +2591,7 @@ async def process_item(item: ExperimentItem) -> ExperimentItemResult:
25882591
task,
25892592
evaluators,
25902593
composite_evaluator,
2594+
shared_fallback_experiment_id,
25912595
name,
25922596
run_name,
25932597
description,
@@ -2619,7 +2623,14 @@ async def process_item(item: ExperimentItem) -> ExperimentItemResult:
26192623
langfuse_logger.error(f"Run evaluator failed: {e}")
26202624

26212625
# Generate dataset run URL if applicable
2622-
dataset_run_id = valid_results[0].dataset_run_id if valid_results else None
2626+
dataset_run_id = next(
2627+
(
2628+
result.dataset_run_id
2629+
for result in valid_results
2630+
if result.dataset_run_id
2631+
),
2632+
None,
2633+
)
26232634
dataset_run_url = None
26242635
if dataset_run_id and data:
26252636
try:
@@ -2665,6 +2676,7 @@ async def process_item(item: ExperimentItem) -> ExperimentItemResult:
26652676
description=description,
26662677
item_results=valid_results,
26672678
run_evaluations=run_evaluations,
2679+
experiment_id=dataset_run_id or shared_fallback_experiment_id,
26682680
dataset_run_id=dataset_run_id,
26692681
dataset_run_url=dataset_run_url,
26702682
)
@@ -2675,6 +2687,7 @@ async def _process_experiment_item(
26752687
task: Callable,
26762688
evaluators: List[Callable],
26772689
composite_evaluator: Optional[CompositeEvaluatorFunction],
2690+
fallback_experiment_id: str,
26782691
experiment_name: str,
26792692
experiment_run_name: str,
26802693
experiment_description: Optional[str],
@@ -2753,7 +2766,7 @@ async def _process_experiment_item(
27532766
if isinstance(item_metadata, dict):
27542767
final_observation_metadata.update(item_metadata)
27552768

2756-
experiment_id = dataset_run_id or self._create_observation_id()
2769+
experiment_id = dataset_run_id or fallback_experiment_id
27572770
experiment_item_id = (
27582771
dataset_item_id or get_sha256_hash_hex(_serialize(input_data))[:16]
27592772
)

langfuse/experiment.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,9 @@ class ExperimentResult:
303303
containing the original item, task output, evaluations, and trace information.
304304
run_evaluations: List of aggregate evaluation results computed across all items,
305305
such as average scores, statistical summaries, or cross-item analyses.
306+
experiment_id: ID of the experiment run propagated across all items. For
307+
Langfuse datasets, this matches the dataset run ID. For local experiments,
308+
this is a stable SDK-generated identifier for the run.
306309
dataset_run_id: Optional ID of the dataset run in Langfuse (when using Langfuse datasets).
307310
dataset_run_url: Optional direct URL to view the experiment results in Langfuse UI.
308311
@@ -361,6 +364,7 @@ def __init__(
361364
description: Optional[str],
362365
item_results: List[ExperimentItemResult],
363366
run_evaluations: List[Evaluation],
367+
experiment_id: str,
364368
dataset_run_id: Optional[str] = None,
365369
dataset_run_url: Optional[str] = None,
366370
):
@@ -372,6 +376,7 @@ def __init__(
372376
description: Optional description of the experiment.
373377
item_results: List of results from processing individual dataset items.
374378
run_evaluations: List of aggregate evaluation results for the entire run.
379+
experiment_id: ID of the experiment run.
375380
dataset_run_id: Optional ID of the dataset run (for Langfuse datasets).
376381
dataset_run_url: Optional URL to view results in Langfuse UI.
377382
"""
@@ -380,6 +385,7 @@ def __init__(
380385
self.description = description
381386
self.item_results = item_results
382387
self.run_evaluations = run_evaluations
388+
self.experiment_id = experiment_id
383389
self.dataset_run_id = dataset_run_id
384390
self.dataset_run_url = dataset_run_url
385391

tests/test_propagate_attributes.py

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2384,6 +2384,7 @@ def task_with_child_spans(*, item, **kwargs):
23842384
experiment_id = first_root["attributes"][
23852385
LangfuseOtelSpanAttributes.EXPERIMENT_ID
23862386
]
2387+
assert result.experiment_id == experiment_id
23872388
experiment_item_id = first_root["attributes"][
23882389
LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID
23892390
]
@@ -2478,25 +2479,55 @@ def task_with_child_spans(*, item, **kwargs):
24782479
LangfuseOtelSpanAttributes.EXPERIMENT_DATASET_ID,
24792480
)
24802481

2482+
def test_experiment_id_is_stable_across_local_items(
2483+
self, langfuse_client, memory_exporter
2484+
):
2485+
"""Test local experiments reuse one experiment ID across all items."""
2486+
local_data = [
2487+
{"input": "test input 1", "expected_output": "expected result 1"},
2488+
{"input": "test input 2", "expected_output": "expected result 2"},
2489+
]
2490+
2491+
result = langfuse_client.run_experiment(
2492+
name="Stable Local Experiment",
2493+
data=local_data,
2494+
task=lambda *, item, **kwargs: f"processed: {item['input']}",
2495+
)
2496+
2497+
langfuse_client.flush()
2498+
time.sleep(0.1)
2499+
2500+
root_spans = self.get_spans_by_name(memory_exporter, "experiment-item-run")
2501+
experiment_ids = {
2502+
span["attributes"][LangfuseOtelSpanAttributes.EXPERIMENT_ID]
2503+
for span in root_spans
2504+
}
2505+
2506+
assert len(experiment_ids) == 1
2507+
assert result.experiment_id == next(iter(experiment_ids))
2508+
24812509
def test_experiment_attributes_propagate_with_dataset(
24822510
self, langfuse_client, memory_exporter, monkeypatch
24832511
):
24842512
"""Test experiment attribute propagation with Langfuse dataset."""
24852513

2486-
# Mock the async API to create dataset run items
2487-
async def mock_create_dataset_run_item(*args, **kwargs):
2514+
# Mock the sync API used by run_experiment to create dataset run items
2515+
def mock_create_dataset_run_item(*args, **kwargs):
24882516
from langfuse.api import DatasetRunItem
24892517

2490-
request = kwargs.get("request")
24912518
return DatasetRunItem(
24922519
id="mock-run-item-id",
24932520
dataset_run_id="mock-dataset-run-id-123",
2494-
dataset_item_id=request.datasetItemId if request else "mock-item-id",
2521+
dataset_run_name=kwargs.get("run_name", "Dataset Test"),
2522+
dataset_item_id=kwargs.get("dataset_item_id", "mock-item-id"),
24952523
trace_id="mock-trace-id",
2524+
observation_id=kwargs.get("observation_id"),
2525+
created_at=datetime.now(),
2526+
updated_at=datetime.now(),
24962527
)
24972528

24982529
monkeypatch.setattr(
2499-
langfuse_client.async_api.dataset_run_items,
2530+
langfuse_client.api.dataset_run_items,
25002531
"create",
25012532
mock_create_dataset_run_item,
25022533
)
@@ -2548,7 +2579,7 @@ def task_with_children(*, item, **kwargs):
25482579

25492580
# Run experiment
25502581
experiment_metadata = {"dataset_version": "v2", "test_run": "true"}
2551-
dataset.run_experiment(
2582+
result = dataset.run_experiment(
25522583
name="Dataset Test",
25532584
description="Dataset experiment description",
25542585
task=task_with_children,
@@ -2562,6 +2593,7 @@ def task_with_children(*, item, **kwargs):
25622593
root_spans = self.get_spans_by_name(memory_exporter, "experiment-item-run")
25632594
assert len(root_spans) >= 1, "Should have at least 1 root span"
25642595
first_root = root_spans[0]
2596+
assert result.experiment_id == "mock-dataset-run-id-123"
25652597

25662598
# Root-only attributes should be on root
25672599
self.verify_span_attribute(
@@ -2588,6 +2620,11 @@ def task_with_children(*, item, **kwargs):
25882620
LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID,
25892621
dataset_item_id,
25902622
)
2623+
self.verify_span_attribute(
2624+
first_root,
2625+
LangfuseOtelSpanAttributes.EXPERIMENT_ID,
2626+
result.experiment_id,
2627+
)
25912628

25922629
# Should have experiment metadata
25932630
self.verify_span_attribute(

0 commit comments

Comments
 (0)