Skip to content

Commit 317e705

Browse files
feat(dataset-versioning): support running versioned experiments (#1517)
* feat(dataset-versioning): support running versioned experiments * test: add dataset item creation in versioned experiment test
1 parent 1ae2923 commit 317e705

File tree

3 files changed

+88
-3
lines changed

3 files changed

+88
-3
lines changed

langfuse/_client/client.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2483,7 +2483,7 @@ def get_dataset(
24832483

24842484
items = [DatasetItemClient(i, langfuse=self) for i in dataset_items]
24852485

2486-
return DatasetClient(dataset, items=items)
2486+
return DatasetClient(dataset, items=items, version=version)
24872487

24882488
except Error as e:
24892489
handle_fern_exception(e)
@@ -2574,6 +2574,7 @@ def run_experiment(
25742574
run_evaluators: List[RunEvaluatorFunction] = [],
25752575
max_concurrency: int = 50,
25762576
metadata: Optional[Dict[str, str]] = None,
2577+
_dataset_version: Optional[datetime] = None,
25772578
) -> ExperimentResult:
25782579
"""Run an experiment on a dataset with automatic tracing and evaluation.
25792580
@@ -2751,6 +2752,7 @@ def average_accuracy(*, item_results, **kwargs):
27512752
run_evaluators=run_evaluators or [],
27522753
max_concurrency=max_concurrency,
27532754
metadata=metadata,
2755+
dataset_version=_dataset_version,
27542756
),
27552757
),
27562758
)
@@ -2768,6 +2770,7 @@ async def _run_experiment_async(
27682770
run_evaluators: List[RunEvaluatorFunction],
27692771
max_concurrency: int,
27702772
metadata: Optional[Dict[str, Any]] = None,
2773+
dataset_version: Optional[datetime] = None,
27712774
) -> ExperimentResult:
27722775
langfuse_logger.debug(
27732776
f"Starting experiment '{name}' run '{run_name}' with {len(data)} items"
@@ -2788,6 +2791,7 @@ async def process_item(item: ExperimentItem) -> ExperimentItemResult:
27882791
run_name,
27892792
description,
27902793
metadata,
2794+
dataset_version,
27912795
)
27922796

27932797
# Run all items concurrently
@@ -2874,6 +2878,7 @@ async def _process_experiment_item(
28742878
experiment_run_name: str,
28752879
experiment_description: Optional[str],
28762880
experiment_metadata: Optional[Dict[str, Any]] = None,
2881+
dataset_version: Optional[datetime] = None,
28772882
) -> ExperimentItemResult:
28782883
span_name = "experiment-item-run"
28792884

@@ -2925,6 +2930,7 @@ async def _process_experiment_item(
29252930
datasetItemId=item.id, # type: ignore
29262931
traceId=trace_id,
29272932
observationId=span.id,
2933+
datasetVersion=dataset_version,
29282934
),
29292935
)
29302936

langfuse/_client/datasets.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ class DatasetClient:
155155
created_at (datetime): Timestamp of dataset creation.
156156
updated_at (datetime): Timestamp of the last update to the dataset.
157157
items (List[DatasetItemClient]): List of dataset items associated with the dataset.
158-
158+
version (Optional[datetime]): Timestamp of the dataset version.
159159
Example:
160160
Print the input of each dataset item in a dataset.
161161
```python
@@ -178,8 +178,14 @@ class DatasetClient:
178178
created_at: dt.datetime
179179
updated_at: dt.datetime
180180
items: List[DatasetItemClient]
181+
version: Optional[dt.datetime]
181182

182-
def __init__(self, dataset: Dataset, items: List[DatasetItemClient]):
183+
def __init__(
184+
self,
185+
dataset: Dataset,
186+
items: List[DatasetItemClient],
187+
version: Optional[dt.datetime] = None,
188+
):
183189
"""Initialize the DatasetClient."""
184190
self.id = dataset.id
185191
self.name = dataset.name
@@ -189,6 +195,7 @@ def __init__(self, dataset: Dataset, items: List[DatasetItemClient]):
189195
self.created_at = dataset.created_at
190196
self.updated_at = dataset.updated_at
191197
self.items = items
198+
self.version = version
192199
self._langfuse: Optional["Langfuse"] = None
193200

194201
def _get_langfuse_client(self) -> Optional["Langfuse"]:
@@ -421,4 +428,5 @@ def content_diversity(*, item_results, **kwargs):
421428
run_evaluators=run_evaluators,
422429
max_concurrency=max_concurrency,
423430
metadata=metadata,
431+
_dataset_version=self.version,
424432
)

tests/test_datasets.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,3 +569,74 @@ def test_get_dataset_with_version():
569569
# Verify fetching without version returns both items (latest)
570570
dataset_latest = langfuse.get_dataset(name)
571571
assert len(dataset_latest.items) == 2
572+
573+
574+
def test_run_experiment_with_versioned_dataset():
575+
"""Test that running an experiment on a versioned dataset works correctly."""
576+
from datetime import timedelta
577+
import time
578+
579+
langfuse = Langfuse(debug=False)
580+
581+
# Create dataset
582+
name = create_uuid()
583+
langfuse.create_dataset(name=name)
584+
585+
# Create first item
586+
langfuse.create_dataset_item(
587+
dataset_name=name, input={"question": "What is 2+2?"}, expected_output=4
588+
)
589+
langfuse.flush()
590+
time.sleep(3)
591+
592+
# Fetch dataset to get the actual server-assigned timestamp of item1
593+
dataset_after_item1 = langfuse.get_dataset(name)
594+
assert len(dataset_after_item1.items) == 1
595+
item1_id = dataset_after_item1.items[0].id
596+
item1_created_at = dataset_after_item1.items[0].created_at
597+
598+
# Use a timestamp 1 second after item1's creation
599+
version_timestamp = item1_created_at + timedelta(seconds=1)
600+
time.sleep(3)
601+
602+
# Update item1 after the version timestamp (this should not affect versioned query)
603+
langfuse.create_dataset_item(
604+
id=item1_id,
605+
dataset_name=name,
606+
input={"question": "What is 4+4?"},
607+
expected_output=8,
608+
)
609+
langfuse.flush()
610+
time.sleep(3)
611+
612+
# Create second item (after version timestamp)
613+
langfuse.create_dataset_item(
614+
dataset_name=name, input={"question": "What is 3+3?"}, expected_output=6
615+
)
616+
langfuse.flush()
617+
time.sleep(3)
618+
619+
# Get versioned dataset (should only have first item with ORIGINAL state)
620+
versioned_dataset = langfuse.get_dataset(name, version=version_timestamp)
621+
assert len(versioned_dataset.items) == 1
622+
assert versioned_dataset.version == version_timestamp
623+
# Verify it returns the ORIGINAL version of item1 (before the update)
624+
assert versioned_dataset.items[0].input == {"question": "What is 2+2?"}
625+
assert versioned_dataset.items[0].expected_output == 4
626+
assert versioned_dataset.items[0].id == item1_id
627+
628+
# Run a simple experiment on the versioned dataset
629+
def simple_task(*, item, **kwargs):
630+
# Just return a static answer
631+
return item.expected_output
632+
633+
result = versioned_dataset.run_experiment(
634+
name="Versioned Dataset Test",
635+
description="Testing experiment with versioned dataset",
636+
task=simple_task,
637+
)
638+
639+
# Verify experiment ran successfully
640+
assert result.name == "Versioned Dataset Test"
641+
assert len(result.item_results) == 1 # Only one item in versioned dataset
642+
assert result.item_results[0].output == 4

0 commit comments

Comments
 (0)