diff --git a/langfuse/_task_manager/score_ingestion_consumer.py b/langfuse/_task_manager/score_ingestion_consumer.py index 0aedaa7a5..7a256004d 100644 --- a/langfuse/_task_manager/score_ingestion_consumer.py +++ b/langfuse/_task_manager/score_ingestion_consumer.py @@ -11,6 +11,7 @@ from langfuse._utils.parse_error import handle_exception from langfuse._utils.request import APIError, LangfuseClient from langfuse._utils.serializer import EventSerializer +from langfuse.api.core.pydantic_utilities import UniversalBaseModel from langfuse.logger import langfuse_logger as logger from ..version import __version__ as langfuse_version @@ -73,7 +74,10 @@ def _next(self) -> list: # convert pydantic models to dicts if "body" in event and isinstance(event["body"], BaseModel): - event["body"] = event["body"].model_dump(exclude_none=True) + if isinstance(event["body"], UniversalBaseModel): + event["body"] = event["body"].dict(exclude_none=True) + else: + event["body"] = event["body"].model_dump(exclude_none=True) item_size = self._get_item_size(event) diff --git a/langfuse/_utils/serializer.py b/langfuse/_utils/serializer.py index c2dad3312..80ba27850 100644 --- a/langfuse/_utils/serializer.py +++ b/langfuse/_utils/serializer.py @@ -15,6 +15,7 @@ from pydantic import BaseModel +from langfuse.api.core.pydantic_utilities import UniversalBaseModel from langfuse.media import LangfuseMedia # Attempt to import Serializable @@ -104,6 +105,9 @@ def default(self, obj: Any) -> Any: if isinstance(raw := getattr(obj, "raw", None), BaseModel): raw.model_rebuild() + if isinstance(obj, UniversalBaseModel): + return obj.dict() + return obj.model_dump() if isinstance(obj, Path): diff --git a/tests/test_serializer.py b/tests/test_serializer.py index 4faf7019b..5e82f1b0a 100644 --- a/tests/test_serializer.py +++ b/tests/test_serializer.py @@ -4,13 +4,15 @@ from datetime import date, datetime, timezone from enum import Enum from pathlib import Path +from queue import Queue +from unittest.mock import Mock from uuid import UUID from pydantic import BaseModel -from langfuse._utils.serializer import ( - EventSerializer, -) +from langfuse._task_manager.score_ingestion_consumer import ScoreIngestionConsumer +from langfuse._utils.serializer import EventSerializer +from langfuse.api import ScoreBody class TestEnum(Enum): @@ -69,6 +71,68 @@ def test_pydantic_model(): assert json.loads(serializer.encode(model)) == {"field": "test"} +def test_langfuse_model_uses_aliases(): + model = ScoreBody( + id="score-1", + trace_id="trace-1", + session_id="session-1", + observation_id="observation-1", + dataset_run_id="dataset-run-1", + name="rating", + value=2, + data_type="NUMERIC", + config_id="config-1", + ) + serializer = EventSerializer() + + assert json.loads(serializer.encode(model)) == { + "id": "score-1", + "traceId": "trace-1", + "sessionId": "session-1", + "observationId": "observation-1", + "datasetRunId": "dataset-run-1", + "name": "rating", + "value": 2, + "dataType": "NUMERIC", + "configId": "config-1", + } + + +def test_score_ingestion_consumer_uses_aliases_for_langfuse_models(): + ingestion_queue = Queue() + consumer = ScoreIngestionConsumer( + ingestion_queue=ingestion_queue, + identifier=0, + client=Mock(), + public_key="pk-test", + ) + + ingestion_queue.put( + { + "id": "event-1", + "type": "score-create", + "timestamp": "2026-03-25T16:10:45.793Z", + "body": ScoreBody( + id="score-1", + session_id="session-1", + name="rating", + value=2, + data_type="NUMERIC", + ), + } + ) + + batch = consumer._next() + + assert batch[0]["body"] == { + "id": "score-1", + "sessionId": "session-1", + "name": "rating", + "value": 2, + "dataType": "NUMERIC", + } + + def test_path(): path = Path("/tmp/test.txt") serializer = EventSerializer()