diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 9beed5f67..85ec83a4e 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -2427,6 +2427,7 @@ def run_experiment( - run_name: The experiment run name. This is equal to the dataset run name if experiment was on Langfuse dataset. - item_results: List of results for each processed item with outputs and evaluations - run_evaluations: List of aggregate evaluation results for the entire run + - experiment_id: Stable identifier for the experiment run across all items - dataset_run_id: ID of the dataset run (if using Langfuse datasets) - dataset_run_url: Direct URL to view results in Langfuse UI (if applicable) @@ -2577,6 +2578,8 @@ async def _run_experiment_async( f"Starting experiment '{name}' run '{run_name}' with {len(data)} items" ) + shared_fallback_experiment_id = self._create_observation_id() + # Set up concurrency control semaphore = asyncio.Semaphore(max_concurrency) @@ -2588,6 +2591,7 @@ async def process_item(item: ExperimentItem) -> ExperimentItemResult: task, evaluators, composite_evaluator, + shared_fallback_experiment_id, name, run_name, description, @@ -2619,7 +2623,14 @@ async def process_item(item: ExperimentItem) -> ExperimentItemResult: langfuse_logger.error(f"Run evaluator failed: {e}") # Generate dataset run URL if applicable - dataset_run_id = valid_results[0].dataset_run_id if valid_results else None + dataset_run_id = next( + ( + result.dataset_run_id + for result in valid_results + if result.dataset_run_id + ), + None, + ) dataset_run_url = None if dataset_run_id and data: try: @@ -2665,6 +2676,7 @@ async def process_item(item: ExperimentItem) -> ExperimentItemResult: description=description, item_results=valid_results, run_evaluations=run_evaluations, + experiment_id=dataset_run_id or shared_fallback_experiment_id, dataset_run_id=dataset_run_id, dataset_run_url=dataset_run_url, ) @@ -2675,6 +2687,7 @@ async def _process_experiment_item( task: Callable, evaluators: List[Callable], composite_evaluator: Optional[CompositeEvaluatorFunction], + fallback_experiment_id: str, experiment_name: str, experiment_run_name: str, experiment_description: Optional[str], @@ -2753,7 +2766,7 @@ async def _process_experiment_item( if isinstance(item_metadata, dict): final_observation_metadata.update(item_metadata) - experiment_id = dataset_run_id or self._create_observation_id() + experiment_id = dataset_run_id or fallback_experiment_id experiment_item_id = ( dataset_item_id or get_sha256_hash_hex(_serialize(input_data))[:16] ) diff --git a/langfuse/experiment.py b/langfuse/experiment.py index ff707fc9b..6e4b32e10 100644 --- a/langfuse/experiment.py +++ b/langfuse/experiment.py @@ -303,6 +303,9 @@ class ExperimentResult: containing the original item, task output, evaluations, and trace information. run_evaluations: List of aggregate evaluation results computed across all items, such as average scores, statistical summaries, or cross-item analyses. + experiment_id: ID of the experiment run propagated across all items. For + Langfuse datasets, this matches the dataset run ID. For local experiments, + this is a stable SDK-generated identifier for the run. dataset_run_id: Optional ID of the dataset run in Langfuse (when using Langfuse datasets). dataset_run_url: Optional direct URL to view the experiment results in Langfuse UI. @@ -361,6 +364,7 @@ def __init__( description: Optional[str], item_results: List[ExperimentItemResult], run_evaluations: List[Evaluation], + experiment_id: str, dataset_run_id: Optional[str] = None, dataset_run_url: Optional[str] = None, ): @@ -372,6 +376,7 @@ def __init__( description: Optional description of the experiment. item_results: List of results from processing individual dataset items. run_evaluations: List of aggregate evaluation results for the entire run. + experiment_id: ID of the experiment run. dataset_run_id: Optional ID of the dataset run (for Langfuse datasets). dataset_run_url: Optional URL to view results in Langfuse UI. """ @@ -380,6 +385,7 @@ def __init__( self.description = description self.item_results = item_results self.run_evaluations = run_evaluations + self.experiment_id = experiment_id self.dataset_run_id = dataset_run_id self.dataset_run_url = dataset_run_url diff --git a/tests/test_propagate_attributes.py b/tests/test_propagate_attributes.py index b3be9f830..d771a3d74 100644 --- a/tests/test_propagate_attributes.py +++ b/tests/test_propagate_attributes.py @@ -2384,6 +2384,7 @@ def task_with_child_spans(*, item, **kwargs): experiment_id = first_root["attributes"][ LangfuseOtelSpanAttributes.EXPERIMENT_ID ] + assert result.experiment_id == experiment_id experiment_item_id = first_root["attributes"][ LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID ] @@ -2478,25 +2479,55 @@ def task_with_child_spans(*, item, **kwargs): LangfuseOtelSpanAttributes.EXPERIMENT_DATASET_ID, ) + def test_experiment_id_is_stable_across_local_items( + self, langfuse_client, memory_exporter + ): + """Test local experiments reuse one experiment ID across all items.""" + local_data = [ + {"input": "test input 1", "expected_output": "expected result 1"}, + {"input": "test input 2", "expected_output": "expected result 2"}, + ] + + result = langfuse_client.run_experiment( + name="Stable Local Experiment", + data=local_data, + task=lambda *, item, **kwargs: f"processed: {item['input']}", + ) + + langfuse_client.flush() + time.sleep(0.1) + + root_spans = self.get_spans_by_name(memory_exporter, "experiment-item-run") + experiment_ids = { + span["attributes"][LangfuseOtelSpanAttributes.EXPERIMENT_ID] + for span in root_spans + } + + assert len(experiment_ids) == 1 + assert result.experiment_id == next(iter(experiment_ids)) + def test_experiment_attributes_propagate_with_dataset( self, langfuse_client, memory_exporter, monkeypatch ): """Test experiment attribute propagation with Langfuse dataset.""" - # Mock the async API to create dataset run items - async def mock_create_dataset_run_item(*args, **kwargs): + # Mock the sync API used by run_experiment to create dataset run items + def mock_create_dataset_run_item(*args, **kwargs): from langfuse.api import DatasetRunItem - request = kwargs.get("request") return DatasetRunItem( id="mock-run-item-id", dataset_run_id="mock-dataset-run-id-123", - dataset_item_id=request.datasetItemId if request else "mock-item-id", + dataset_run_name=kwargs.get("run_name", "Dataset Test"), + dataset_item_id=kwargs.get("dataset_item_id", "mock-item-id"), trace_id="mock-trace-id", + observation_id=kwargs.get("observation_id"), + created_at=datetime.now(), + updated_at=datetime.now(), ) monkeypatch.setattr( - langfuse_client.async_api.dataset_run_items, + langfuse_client.api.dataset_run_items, "create", mock_create_dataset_run_item, ) @@ -2548,7 +2579,7 @@ def task_with_children(*, item, **kwargs): # Run experiment experiment_metadata = {"dataset_version": "v2", "test_run": "true"} - dataset.run_experiment( + result = dataset.run_experiment( name="Dataset Test", description="Dataset experiment description", task=task_with_children, @@ -2562,6 +2593,7 @@ def task_with_children(*, item, **kwargs): root_spans = self.get_spans_by_name(memory_exporter, "experiment-item-run") assert len(root_spans) >= 1, "Should have at least 1 root span" first_root = root_spans[0] + assert result.experiment_id == "mock-dataset-run-id-123" # Root-only attributes should be on root self.verify_span_attribute( @@ -2588,6 +2620,11 @@ def task_with_children(*, item, **kwargs): LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID, dataset_item_id, ) + self.verify_span_attribute( + first_root, + LangfuseOtelSpanAttributes.EXPERIMENT_ID, + result.experiment_id, + ) # Should have experiment metadata self.verify_span_attribute(