Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion langfuse/_task_manager/score_ingestion_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from langfuse._utils.parse_error import handle_exception
from langfuse._utils.request import APIError, LangfuseClient
from langfuse._utils.serializer import EventSerializer
from langfuse.api.core.pydantic_utilities import UniversalBaseModel
from langfuse.logger import langfuse_logger as logger

from ..version import __version__ as langfuse_version
Expand Down Expand Up @@ -73,7 +74,10 @@ def _next(self) -> list:

# convert pydantic models to dicts
if "body" in event and isinstance(event["body"], BaseModel):
event["body"] = event["body"].model_dump(exclude_none=True)
if isinstance(event["body"], UniversalBaseModel):
event["body"] = event["body"].dict(exclude_none=True)
Comment on lines 75 to +78
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The new code calls event["body"].dict(exclude_none=True) for UniversalBaseModel instances, but UniversalBaseModel.dict() in Pydantic V2 hardcodes exclude_none=False after **kwargs, overriding the caller's exclude_none=True; explicitly-set None fields (e.g. ScoreBody(trace_id=None)) will appear in the serialized API payload as null rather than being omitted. This is a regression from the previous model_dump(exclude_none=True) which correctly stripped all None values, and may cause server-side validation errors for users who explicitly pass None for optional score body fields. The fix should call model_dump(exclude_none=True, by_alias=True) directly instead of relying on UniversalBaseModel.dict().

Extended reasoning...

What the bug is and how it manifests

The PR changes score_ingestion_consumer.py line 77 to call event["body"].dict(exclude_none=True) for UniversalBaseModel instances instead of the old model_dump(exclude_none=True). The intent is correct — using .dict() enables alias-aware serialization (camelCase keys). However, UniversalBaseModel.dict() in Pydantic V2 does not reliably honor the caller's exclude_none=True for fields that were explicitly set to None.

The specific code path that triggers it

In langfuse/api/core/pydantic_utilities.py lines 135–151, UniversalBaseModel.dict() builds two model dump calls for Pydantic V2:

kwargs_with_defaults_exclude_unset = {
    **kwargs,            # caller's exclude_none=True is placed here...
    "by_alias": True,
    "exclude_unset": True,
    "exclude_none": False,  # ...but then unconditionally overridden here
}

Because "exclude_none": False is a literal key placed after **kwargs, it always wins. The first model_dump therefore runs with exclude_none=False regardless of what the caller passed.

Why existing code doesn't prevent it

The deep-merge logic (deep_union_pydantic_dicts) then merges source (from the first dump, exclude_unset=True, exclude_none=False) into destination (from the second dump, exclude_none=True). For a field explicitly set to None: the first dump includes it (it was set, and exclude_none=False); the second dump omits it. The merge iterates over source keys and writes them into destination, so traceId: None ends up in the final output even though the caller asked for exclude_none=True.

Step-by-step proof

  1. User creates: ScoreBody(name="rating", value=1, trace_id=None)
  2. trace_id is now in the model's __fields_set__ (explicitly set).
  3. event["body"].dict(exclude_none=True) is called.
  4. First model_dump runs with {by_alias: True, exclude_unset: True, exclude_none: False} → returns {"name": "rating", "value": 1, "traceId": null}.
  5. Second model_dump runs with {by_alias: True, exclude_none: True, exclude_unset: False} → returns {"name": "rating", "value": 1} (traceId omitted).
  6. deep_union_pydantic_dicts(source, destination) copies traceId: None from source into destination.
  7. Final result: {"name": "rating", "value": 1, "traceId": null} — the None field was not excluded.
  8. Old code: model_dump(exclude_none=True) would have returned {"name": "rating", "value": 1} with no traceId key at all.

What the impact would be

When users explicitly set optional score body fields to None (a common pattern for clearing/defaulting fields), the null values are now included in the POST body sent to the Langfuse API. If the server performs strict schema validation and rejects unexpected null fields, these requests will fail — a regression that previously worked fine with model_dump(exclude_none=True). The added tests only use non-None field values, so this regression is not caught by the new test suite.

How to fix it

Replace event["body"].dict(exclude_none=True) with event["body"].model_dump(exclude_none=True, by_alias=True). This achieves both goals: alias-aware camelCase serialization and reliable exclusion of all None fields, bypassing the broken override in UniversalBaseModel.dict().

else:
event["body"] = event["body"].model_dump(exclude_none=True)

item_size = self._get_item_size(event)

Expand Down
4 changes: 4 additions & 0 deletions langfuse/_utils/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from pydantic import BaseModel

from langfuse.api.core.pydantic_utilities import UniversalBaseModel
from langfuse.media import LangfuseMedia

# Attempt to import Serializable
Expand Down Expand Up @@ -104,6 +105,9 @@ def default(self, obj: Any) -> Any:
if isinstance(raw := getattr(obj, "raw", None), BaseModel):
raw.model_rebuild()

if isinstance(obj, UniversalBaseModel):
return obj.dict()

return obj.model_dump()

if isinstance(obj, Path):
Expand Down
70 changes: 67 additions & 3 deletions tests/test_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from datetime import date, datetime, timezone
from enum import Enum
from pathlib import Path
from queue import Queue
from unittest.mock import Mock
from uuid import UUID

from pydantic import BaseModel

from langfuse._utils.serializer import (
EventSerializer,
)
from langfuse._task_manager.score_ingestion_consumer import ScoreIngestionConsumer
from langfuse._utils.serializer import EventSerializer
from langfuse.api import ScoreBody


class TestEnum(Enum):
Expand Down Expand Up @@ -69,6 +71,68 @@ def test_pydantic_model():
assert json.loads(serializer.encode(model)) == {"field": "test"}


def test_langfuse_model_uses_aliases():
model = ScoreBody(
id="score-1",
trace_id="trace-1",
session_id="session-1",
observation_id="observation-1",
dataset_run_id="dataset-run-1",
name="rating",
value=2,
data_type="NUMERIC",
config_id="config-1",
)
serializer = EventSerializer()

assert json.loads(serializer.encode(model)) == {
"id": "score-1",
"traceId": "trace-1",
"sessionId": "session-1",
"observationId": "observation-1",
"datasetRunId": "dataset-run-1",
"name": "rating",
"value": 2,
"dataType": "NUMERIC",
"configId": "config-1",
}


def test_score_ingestion_consumer_uses_aliases_for_langfuse_models():
ingestion_queue = Queue()
consumer = ScoreIngestionConsumer(
ingestion_queue=ingestion_queue,
identifier=0,
client=Mock(),
public_key="pk-test",
)

ingestion_queue.put(
{
"id": "event-1",
"type": "score-create",
"timestamp": "2026-03-25T16:10:45.793Z",
"body": ScoreBody(
id="score-1",
Comment on lines +104 to +116
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The new test test_score_ingestion_consumer_uses_aliases_for_langfuse_models blocks for ~1 second on each run because ScoreIngestionConsumer is constructed without flush_interval, so it defaults to 1.0s. Pass flush_interval=0.01 (or similar small value) to the constructor to avoid this unnecessary wait.

Extended reasoning...

What the bug is: test_score_ingestion_consumer_uses_aliases_for_langfuse_models constructs ScoreIngestionConsumer without specifying flush_interval, which defaults to 1 (one second, set at line self._flush_interval = flush_interval or 1). This causes every run of this test to block for approximately one second.

The specific code path: In _next(), the while loop condition is len(events) < self._flush_at (default 15). After successfully dequeuing the one item placed in the queue, len(events) becomes 1, which is still less than 15. The loop continues and computes elapsed = time.monotonic() - start_time, which is near zero (a few milliseconds). Since elapsed < flush_interval (1.0), the break condition is not triggered. The loop then calls self._ingestion_queue.get(block=True, timeout=self._flush_interval - elapsed), which is approximately 1.0 - 0.001 ≈ 0.999 seconds. No second item ever arrives, so an Empty exception is raised after ~1 second, finally breaking the loop.

Why existing code doesn't prevent it: The flush_interval guard is designed for production use where waiting a bit before flushing is acceptable. The test just forgets to set a small flush_interval value appropriate for a unit test that only needs to drain a single pre-populated item.

Step-by-step proof:

  1. ScoreIngestionConsumer is instantiated without flush_intervalself._flush_interval = 1.0
  2. One item is put into the queue
  3. consumer._next() is called
  4. Loop iteration 1: elapsed ≈ 0, queue.get(timeout≈1.0) retrieves the item; len(events) = 1
  5. Loop iteration 2: 1 < 15 is True; elapsed ≈ 0.001; queue.get(timeout≈0.999) blocks
  6. After ~0.999s, Empty is raised → loop breaks
  7. Test passes but took ~1 second unnecessarily

Impact: Each run of this new test adds approximately 1 second of unnecessary latency to the test suite. At CI scale this compounds across multiple runs.

How to fix: Pass flush_interval=0.01 (or any small value like 0.001) to the constructor in the test:

consumer = ScoreIngestionConsumer(
    ingestion_queue=ingestion_queue,
    identifier=0,
    client=Mock(),
    public_key="pk-test",
    flush_interval=0.01,  # add this
)

session_id="session-1",
name="rating",
value=2,
data_type="NUMERIC",
),
}
)

batch = consumer._next()

assert batch[0]["body"] == {
"id": "score-1",
"sessionId": "session-1",
"name": "rating",
"value": 2,
"dataType": "NUMERIC",
}


def test_path():
path = Path("/tmp/test.txt")
serializer = EventSerializer()
Expand Down
Loading