Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2427,6 +2427,7 @@ def run_experiment(
- run_name: The experiment run name. This is equal to the dataset run name if experiment was on Langfuse dataset.
- item_results: List of results for each processed item with outputs and evaluations
- run_evaluations: List of aggregate evaluation results for the entire run
- experiment_id: Stable identifier for the experiment run across all items
- dataset_run_id: ID of the dataset run (if using Langfuse datasets)
- dataset_run_url: Direct URL to view results in Langfuse UI (if applicable)

Expand Down Expand Up @@ -2577,6 +2578,8 @@ async def _run_experiment_async(
f"Starting experiment '{name}' run '{run_name}' with {len(data)} items"
)

shared_fallback_experiment_id = self._create_observation_id()

# Set up concurrency control
semaphore = asyncio.Semaphore(max_concurrency)

Expand All @@ -2588,6 +2591,7 @@ async def process_item(item: ExperimentItem) -> ExperimentItemResult:
task,
evaluators,
composite_evaluator,
shared_fallback_experiment_id,
name,
run_name,
description,
Expand Down Expand Up @@ -2619,7 +2623,14 @@ async def process_item(item: ExperimentItem) -> ExperimentItemResult:
langfuse_logger.error(f"Run evaluator failed: {e}")

# Generate dataset run URL if applicable
dataset_run_id = valid_results[0].dataset_run_id if valid_results else None
dataset_run_id = next(
(
result.dataset_run_id
for result in valid_results
if result.dataset_run_id
),
None,
)
dataset_run_url = None
if dataset_run_id and data:
try:
Expand Down Expand Up @@ -2665,6 +2676,7 @@ async def process_item(item: ExperimentItem) -> ExperimentItemResult:
description=description,
item_results=valid_results,
run_evaluations=run_evaluations,
experiment_id=dataset_run_id or shared_fallback_experiment_id,
dataset_run_id=dataset_run_id,
dataset_run_url=dataset_run_url,
)
Expand All @@ -2675,6 +2687,7 @@ async def _process_experiment_item(
task: Callable,
evaluators: List[Callable],
composite_evaluator: Optional[CompositeEvaluatorFunction],
fallback_experiment_id: str,
experiment_name: str,
experiment_run_name: str,
experiment_description: Optional[str],
Expand Down Expand Up @@ -2753,7 +2766,7 @@ async def _process_experiment_item(
if isinstance(item_metadata, dict):
final_observation_metadata.update(item_metadata)

experiment_id = dataset_run_id or self._create_observation_id()
experiment_id = dataset_run_id or fallback_experiment_id
experiment_item_id = (
dataset_item_id or get_sha256_hash_hex(_serialize(input_data))[:16]
)
Comment on lines 2766 to 2772
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 When dataset_run_items.create fails for some (but not all) items, those failed items are tagged with fallback_experiment_id on their spans, while result.experiment_id returns the dataset_run_id from the successful items — violating the PR’s documented invariant that experiment_id is “stable across all items”. This only occurs in partial API failure scenarios, so the severity is low, but the new public field introduces an observable inconsistency that didn’t exist as a documented guarantee before.

Extended reasoning...

What the bug is and how it manifests

The PR introduces ExperimentResult.experiment_id with the documented guarantee that it is “propagated across all items.” In _process_experiment_item (line 2769), each item sets experiment_id = dataset_run_id or fallback_experiment_id. When the dataset_run_items.create API call succeeds, dataset_run_id is populated and used; when it raises, dataset_run_id remains None and the span is tagged with the per-run fallback_experiment_id instead. Meanwhile, _run_experiment_async derives result.experiment_id as dataset_run_id or shared_fallback_experiment_id (line 2679), where dataset_run_id comes from the first successful item result (line 2626–2633).

The specific code path that triggers it

In a dataset-backed experiment where some items succeed and some fail the dataset_run_items.create call:

  1. Successful item: dataset_run_id = "real-run-id-abc" → span attribute EXPERIMENT_ID = "real-run-id-abc"
  2. Failed item: dataset_run_id = None → span attribute EXPERIMENT_ID = fallback_experiment_id (e.g. "sdk-uuid-xyz")
  3. result.experiment_id = dataset_run_id or shared_fallback_experiment_id = "real-run-id-abc"

Why existing code doesn’t prevent it

The fallback_experiment_id is passed into _process_experiment_item and used as-is on failure — there is no post-processing step to retroactively update failed spans once the authoritative dataset_run_id becomes known from successful sibling items. The PR only resolves the local experiment case (no dataset involved), where all items correctly share the single shared_fallback_experiment_id.

What the impact is

Clients inspecting result.experiment_id and expecting it to match the EXPERIMENT_ID span attribute on every trace will observe a mismatch for failed items. Downstream tooling that joins spans by experiment_id (e.g., dashboards, evaluations) will miss the failed items. That said, this is an error-recovery path — the API call already failed, so the dataset linkage is already broken for those items regardless.

Addressing the refutation

The refutation correctly notes that this inconsistency existed pre-PR (failed items each got a unique fresh random ID, which was worse). The PR does improve matters: all failed items now share the same fallback_experiment_id instead of diverging IDs. However, the pre-PR code made no documented stability guarantee — the new result.experiment_id field explicitly documents the invariant “propagated across all items,” making the inconsistency observable and testable against the public contract.

Step-by-step proof

  1. Call run_experiment with a Langfuse dataset (2 items).
  2. Monkeypatch dataset_run_items.create to succeed for item 1 (returns dataset_run_id="real-id") and raise for item 2.
  3. After the run: result.experiment_id == "real-id" (from line 2679, successful item).
  4. Inspect spans: item 1 span has EXPERIMENT_ID = "real-id", item 2 span has EXPERIMENT_ID = "sdk-fallback-uuid" (the fallback_experiment_id).
  5. result.experiment_id != span[EXPERIMENT_ID] for item 2 — the documented invariant is broken.

How to fix it

The cleanest fix is to resolve the authoritative dataset_run_id across all items before tagging spans, or to patch failed spans post-hoc once dataset_run_id is known from any successful sibling. Alternatively, the docstring could be scoped: “stable across all successfully registered items.”

Expand Down
6 changes: 6 additions & 0 deletions langfuse/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ class ExperimentResult:
containing the original item, task output, evaluations, and trace information.
run_evaluations: List of aggregate evaluation results computed across all items,
such as average scores, statistical summaries, or cross-item analyses.
experiment_id: ID of the experiment run propagated across all items. For
Langfuse datasets, this matches the dataset run ID. For local experiments,
this is a stable SDK-generated identifier for the run.
dataset_run_id: Optional ID of the dataset run in Langfuse (when using Langfuse datasets).
dataset_run_url: Optional direct URL to view the experiment results in Langfuse UI.

Expand Down Expand Up @@ -361,6 +364,7 @@ def __init__(
description: Optional[str],
item_results: List[ExperimentItemResult],
run_evaluations: List[Evaluation],
experiment_id: str,
dataset_run_id: Optional[str] = None,
dataset_run_url: Optional[str] = None,
):
Expand All @@ -372,6 +376,7 @@ def __init__(
description: Optional description of the experiment.
item_results: List of results from processing individual dataset items.
run_evaluations: List of aggregate evaluation results for the entire run.
experiment_id: ID of the experiment run.
dataset_run_id: Optional ID of the dataset run (for Langfuse datasets).
dataset_run_url: Optional URL to view results in Langfuse UI.
"""
Expand All @@ -380,6 +385,7 @@ def __init__(
self.description = description
self.item_results = item_results
self.run_evaluations = run_evaluations
self.experiment_id = experiment_id
self.dataset_run_id = dataset_run_id
self.dataset_run_url = dataset_run_url

Expand Down
49 changes: 43 additions & 6 deletions tests/test_propagate_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2384,6 +2384,7 @@ def task_with_child_spans(*, item, **kwargs):
experiment_id = first_root["attributes"][
LangfuseOtelSpanAttributes.EXPERIMENT_ID
]
assert result.experiment_id == experiment_id
experiment_item_id = first_root["attributes"][
LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID
]
Expand Down Expand Up @@ -2478,25 +2479,55 @@ def task_with_child_spans(*, item, **kwargs):
LangfuseOtelSpanAttributes.EXPERIMENT_DATASET_ID,
)

def test_experiment_id_is_stable_across_local_items(
self, langfuse_client, memory_exporter
):
"""Test local experiments reuse one experiment ID across all items."""
local_data = [
{"input": "test input 1", "expected_output": "expected result 1"},
{"input": "test input 2", "expected_output": "expected result 2"},
]

result = langfuse_client.run_experiment(
name="Stable Local Experiment",
data=local_data,
task=lambda *, item, **kwargs: f"processed: {item['input']}",
)

langfuse_client.flush()
time.sleep(0.1)

root_spans = self.get_spans_by_name(memory_exporter, "experiment-item-run")
experiment_ids = {
span["attributes"][LangfuseOtelSpanAttributes.EXPERIMENT_ID]
for span in root_spans
}

assert len(experiment_ids) == 1
assert result.experiment_id == next(iter(experiment_ids))

def test_experiment_attributes_propagate_with_dataset(
self, langfuse_client, memory_exporter, monkeypatch
):
"""Test experiment attribute propagation with Langfuse dataset."""
Comment on lines +2507 to 2512
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The new test_experiment_id_is_stable_across_local_items test asserts len(experiment_ids) == 1 without first verifying that both items actually produced spans, meaning the uniqueness check could trivially pass with only one span. Add assert len(root_spans) == 2 before the experiment_ids set construction to ensure the test genuinely validates cross-item stability.

Extended reasoning...

Missing span count assertion weakens the stability test

The test is designed to verify that a single, stable is propagated across all items in a local experiment run. It creates two local items and then asserts . However, it never asserts that before building the set.

The specific gap:
After , the code jumps directly to constructing and asserting its length is 1. If only 1 of the 2 items produced a span (due to a timing issue, race condition, or regression), the set would still contain exactly 1 element and the assertion would pass without actually validating cross-item stability.

Why existing code does not prevent this:
The + pattern reduces but does not eliminate the possibility that one item span is missing. There is no assertion enforcing that both items must appear. A future regression silently dropping one item span would leave this test green.

Step-by-step proof of the false-positive scenario:

  1. processes 2 items with the shared .
  2. Due to a hypothetical timing issue, only item 1 span reaches the exporter; item 2 span is dropped.
  3. has length 1.
  4. = , length 1.
  5. passes, despite only one item being observed.
  6. The test name says across_local_items but only one item was checked.

Note: the 0-span edge case is covered since raises , so only the 1-of-2 scenario slips through.

Impact:
This is a test quality issue, not a production code bug. The fix in sharing is correct. The regression test could however pass silently even if that fix were broken, as long as at least one span is emitted.

Fix:
Add immediately after the call:


# Mock the async API to create dataset run items
async def mock_create_dataset_run_item(*args, **kwargs):
# Mock the sync API used by run_experiment to create dataset run items
def mock_create_dataset_run_item(*args, **kwargs):
from langfuse.api import DatasetRunItem

request = kwargs.get("request")
return DatasetRunItem(
id="mock-run-item-id",
dataset_run_id="mock-dataset-run-id-123",
dataset_item_id=request.datasetItemId if request else "mock-item-id",
dataset_run_name=kwargs.get("run_name", "Dataset Test"),
dataset_item_id=kwargs.get("dataset_item_id", "mock-item-id"),
trace_id="mock-trace-id",
observation_id=kwargs.get("observation_id"),
created_at=datetime.now(),
updated_at=datetime.now(),
)

monkeypatch.setattr(
langfuse_client.async_api.dataset_run_items,
langfuse_client.api.dataset_run_items,
"create",
mock_create_dataset_run_item,
)
Expand Down Expand Up @@ -2548,7 +2579,7 @@ def task_with_children(*, item, **kwargs):

# Run experiment
experiment_metadata = {"dataset_version": "v2", "test_run": "true"}
dataset.run_experiment(
result = dataset.run_experiment(
name="Dataset Test",
description="Dataset experiment description",
task=task_with_children,
Expand All @@ -2562,6 +2593,7 @@ def task_with_children(*, item, **kwargs):
root_spans = self.get_spans_by_name(memory_exporter, "experiment-item-run")
assert len(root_spans) >= 1, "Should have at least 1 root span"
first_root = root_spans[0]
assert result.experiment_id == "mock-dataset-run-id-123"

# Root-only attributes should be on root
self.verify_span_attribute(
Expand All @@ -2588,6 +2620,11 @@ def task_with_children(*, item, **kwargs):
LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID,
dataset_item_id,
)
self.verify_span_attribute(
first_root,
LangfuseOtelSpanAttributes.EXPERIMENT_ID,
result.experiment_id,
)

# Should have experiment metadata
self.verify_span_attribute(
Expand Down
Loading