-
Notifications
You must be signed in to change notification settings - Fork 259
fix(experiments): fix unstable local experiment IDs for local data #1600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2427,6 +2427,7 @@ def run_experiment( | |
| - run_name: The experiment run name. This is equal to the dataset run name if experiment was on Langfuse dataset. | ||
| - item_results: List of results for each processed item with outputs and evaluations | ||
| - run_evaluations: List of aggregate evaluation results for the entire run | ||
| - experiment_id: Stable identifier for the experiment run across all items | ||
| - dataset_run_id: ID of the dataset run (if using Langfuse datasets) | ||
| - dataset_run_url: Direct URL to view results in Langfuse UI (if applicable) | ||
|
|
||
|
|
@@ -2577,6 +2578,8 @@ async def _run_experiment_async( | |
| f"Starting experiment '{name}' run '{run_name}' with {len(data)} items" | ||
| ) | ||
|
|
||
| shared_fallback_experiment_id = self._create_observation_id() | ||
|
|
||
| # Set up concurrency control | ||
| semaphore = asyncio.Semaphore(max_concurrency) | ||
|
|
||
|
|
@@ -2588,6 +2591,7 @@ async def process_item(item: ExperimentItem) -> ExperimentItemResult: | |
| task, | ||
| evaluators, | ||
| composite_evaluator, | ||
| shared_fallback_experiment_id, | ||
| name, | ||
| run_name, | ||
| description, | ||
|
|
@@ -2619,7 +2623,14 @@ async def process_item(item: ExperimentItem) -> ExperimentItemResult: | |
| langfuse_logger.error(f"Run evaluator failed: {e}") | ||
|
|
||
| # Generate dataset run URL if applicable | ||
| dataset_run_id = valid_results[0].dataset_run_id if valid_results else None | ||
| dataset_run_id = next( | ||
| ( | ||
| result.dataset_run_id | ||
| for result in valid_results | ||
| if result.dataset_run_id | ||
| ), | ||
| None, | ||
| ) | ||
| dataset_run_url = None | ||
| if dataset_run_id and data: | ||
| try: | ||
|
|
@@ -2665,6 +2676,7 @@ async def process_item(item: ExperimentItem) -> ExperimentItemResult: | |
| description=description, | ||
| item_results=valid_results, | ||
| run_evaluations=run_evaluations, | ||
| experiment_id=dataset_run_id or shared_fallback_experiment_id, | ||
| dataset_run_id=dataset_run_id, | ||
| dataset_run_url=dataset_run_url, | ||
| ) | ||
|
|
@@ -2675,6 +2687,7 @@ async def _process_experiment_item( | |
| task: Callable, | ||
| evaluators: List[Callable], | ||
| composite_evaluator: Optional[CompositeEvaluatorFunction], | ||
| fallback_experiment_id: str, | ||
| experiment_name: str, | ||
| experiment_run_name: str, | ||
| experiment_description: Optional[str], | ||
|
|
@@ -2753,7 +2766,7 @@ async def _process_experiment_item( | |
| if isinstance(item_metadata, dict): | ||
| final_observation_metadata.update(item_metadata) | ||
|
|
||
| experiment_id = dataset_run_id or self._create_observation_id() | ||
| experiment_id = dataset_run_id or fallback_experiment_id | ||
| experiment_item_id = ( | ||
| dataset_item_id or get_sha256_hash_hex(_serialize(input_data))[:16] | ||
| ) | ||
|
Comment on lines
2766
to
2772
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 When Extended reasoning...What the bug is and how it manifests The PR introduces The specific code path that triggers it In a dataset-backed experiment where some items succeed and some fail the
Why existing code doesn’t prevent it The What the impact is Clients inspecting Addressing the refutation The refutation correctly notes that this inconsistency existed pre-PR (failed items each got a unique fresh random ID, which was worse). The PR does improve matters: all failed items now share the same Step-by-step proof
How to fix it The cleanest fix is to resolve the authoritative |
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2384,6 +2384,7 @@ def task_with_child_spans(*, item, **kwargs): | |
| experiment_id = first_root["attributes"][ | ||
| LangfuseOtelSpanAttributes.EXPERIMENT_ID | ||
| ] | ||
| assert result.experiment_id == experiment_id | ||
| experiment_item_id = first_root["attributes"][ | ||
| LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID | ||
| ] | ||
|
|
@@ -2478,25 +2479,55 @@ def task_with_child_spans(*, item, **kwargs): | |
| LangfuseOtelSpanAttributes.EXPERIMENT_DATASET_ID, | ||
| ) | ||
|
|
||
| def test_experiment_id_is_stable_across_local_items( | ||
| self, langfuse_client, memory_exporter | ||
| ): | ||
| """Test local experiments reuse one experiment ID across all items.""" | ||
| local_data = [ | ||
| {"input": "test input 1", "expected_output": "expected result 1"}, | ||
| {"input": "test input 2", "expected_output": "expected result 2"}, | ||
| ] | ||
|
|
||
| result = langfuse_client.run_experiment( | ||
| name="Stable Local Experiment", | ||
| data=local_data, | ||
| task=lambda *, item, **kwargs: f"processed: {item['input']}", | ||
| ) | ||
|
|
||
| langfuse_client.flush() | ||
| time.sleep(0.1) | ||
|
|
||
| root_spans = self.get_spans_by_name(memory_exporter, "experiment-item-run") | ||
| experiment_ids = { | ||
| span["attributes"][LangfuseOtelSpanAttributes.EXPERIMENT_ID] | ||
| for span in root_spans | ||
| } | ||
|
|
||
| assert len(experiment_ids) == 1 | ||
| assert result.experiment_id == next(iter(experiment_ids)) | ||
|
|
||
| def test_experiment_attributes_propagate_with_dataset( | ||
| self, langfuse_client, memory_exporter, monkeypatch | ||
| ): | ||
| """Test experiment attribute propagation with Langfuse dataset.""" | ||
|
Comment on lines
+2507
to
2512
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 The new test_experiment_id_is_stable_across_local_items test asserts len(experiment_ids) == 1 without first verifying that both items actually produced spans, meaning the uniqueness check could trivially pass with only one span. Add assert len(root_spans) == 2 before the experiment_ids set construction to ensure the test genuinely validates cross-item stability. Extended reasoning...Missing span count assertion weakens the stability testThe test is designed to verify that a single, stable is propagated across all items in a local experiment run. It creates two local items and then asserts . However, it never asserts that before building the set. The specific gap: Why existing code does not prevent this: Step-by-step proof of the false-positive scenario:
Note: the 0-span edge case is covered since raises , so only the 1-of-2 scenario slips through. Impact: Fix: |
||
|
|
||
| # Mock the async API to create dataset run items | ||
| async def mock_create_dataset_run_item(*args, **kwargs): | ||
| # Mock the sync API used by run_experiment to create dataset run items | ||
| def mock_create_dataset_run_item(*args, **kwargs): | ||
| from langfuse.api import DatasetRunItem | ||
|
|
||
| request = kwargs.get("request") | ||
| return DatasetRunItem( | ||
| id="mock-run-item-id", | ||
| dataset_run_id="mock-dataset-run-id-123", | ||
| dataset_item_id=request.datasetItemId if request else "mock-item-id", | ||
| dataset_run_name=kwargs.get("run_name", "Dataset Test"), | ||
| dataset_item_id=kwargs.get("dataset_item_id", "mock-item-id"), | ||
| trace_id="mock-trace-id", | ||
| observation_id=kwargs.get("observation_id"), | ||
| created_at=datetime.now(), | ||
| updated_at=datetime.now(), | ||
| ) | ||
|
|
||
| monkeypatch.setattr( | ||
| langfuse_client.async_api.dataset_run_items, | ||
| langfuse_client.api.dataset_run_items, | ||
| "create", | ||
| mock_create_dataset_run_item, | ||
| ) | ||
|
|
@@ -2548,7 +2579,7 @@ def task_with_children(*, item, **kwargs): | |
|
|
||
| # Run experiment | ||
| experiment_metadata = {"dataset_version": "v2", "test_run": "true"} | ||
| dataset.run_experiment( | ||
| result = dataset.run_experiment( | ||
| name="Dataset Test", | ||
| description="Dataset experiment description", | ||
| task=task_with_children, | ||
|
|
@@ -2562,6 +2593,7 @@ def task_with_children(*, item, **kwargs): | |
| root_spans = self.get_spans_by_name(memory_exporter, "experiment-item-run") | ||
| assert len(root_spans) >= 1, "Should have at least 1 root span" | ||
| first_root = root_spans[0] | ||
| assert result.experiment_id == "mock-dataset-run-id-123" | ||
|
|
||
| # Root-only attributes should be on root | ||
| self.verify_span_attribute( | ||
|
|
@@ -2588,6 +2620,11 @@ def task_with_children(*, item, **kwargs): | |
| LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID, | ||
| dataset_item_id, | ||
| ) | ||
| self.verify_span_attribute( | ||
| first_root, | ||
| LangfuseOtelSpanAttributes.EXPERIMENT_ID, | ||
| result.experiment_id, | ||
| ) | ||
|
|
||
| # Should have experiment metadata | ||
| self.verify_span_attribute( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.