-
Notifications
You must be signed in to change notification settings - Fork 259
fix: preserve score body aliases during ingestion #1581
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,13 +4,15 @@ | |
| from datetime import date, datetime, timezone | ||
| from enum import Enum | ||
| from pathlib import Path | ||
| from queue import Queue | ||
| from unittest.mock import Mock | ||
| from uuid import UUID | ||
|
|
||
| from pydantic import BaseModel | ||
|
|
||
| from langfuse._utils.serializer import ( | ||
| EventSerializer, | ||
| ) | ||
| from langfuse._task_manager.score_ingestion_consumer import ScoreIngestionConsumer | ||
| from langfuse._utils.serializer import EventSerializer | ||
| from langfuse.api import ScoreBody | ||
|
|
||
|
|
||
| class TestEnum(Enum): | ||
|
|
@@ -69,6 +71,68 @@ def test_pydantic_model(): | |
| assert json.loads(serializer.encode(model)) == {"field": "test"} | ||
|
|
||
|
|
||
| def test_langfuse_model_uses_aliases(): | ||
| model = ScoreBody( | ||
| id="score-1", | ||
| trace_id="trace-1", | ||
| session_id="session-1", | ||
| observation_id="observation-1", | ||
| dataset_run_id="dataset-run-1", | ||
| name="rating", | ||
| value=2, | ||
| data_type="NUMERIC", | ||
| config_id="config-1", | ||
| ) | ||
| serializer = EventSerializer() | ||
|
|
||
| assert json.loads(serializer.encode(model)) == { | ||
| "id": "score-1", | ||
| "traceId": "trace-1", | ||
| "sessionId": "session-1", | ||
| "observationId": "observation-1", | ||
| "datasetRunId": "dataset-run-1", | ||
| "name": "rating", | ||
| "value": 2, | ||
| "dataType": "NUMERIC", | ||
| "configId": "config-1", | ||
| } | ||
|
|
||
|
|
||
| def test_score_ingestion_consumer_uses_aliases_for_langfuse_models(): | ||
| ingestion_queue = Queue() | ||
| consumer = ScoreIngestionConsumer( | ||
| ingestion_queue=ingestion_queue, | ||
| identifier=0, | ||
| client=Mock(), | ||
| public_key="pk-test", | ||
| ) | ||
|
|
||
| ingestion_queue.put( | ||
| { | ||
| "id": "event-1", | ||
| "type": "score-create", | ||
| "timestamp": "2026-03-25T16:10:45.793Z", | ||
| "body": ScoreBody( | ||
| id="score-1", | ||
|
Comment on lines
+104
to
+116
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 The new test Extended reasoning...What the bug is: The specific code path: In Why existing code doesn't prevent it: The Step-by-step proof:
Impact: Each run of this new test adds approximately 1 second of unnecessary latency to the test suite. At CI scale this compounds across multiple runs. How to fix: Pass consumer = ScoreIngestionConsumer(
ingestion_queue=ingestion_queue,
identifier=0,
client=Mock(),
public_key="pk-test",
flush_interval=0.01, # add this
) |
||
| session_id="session-1", | ||
| name="rating", | ||
| value=2, | ||
| data_type="NUMERIC", | ||
| ), | ||
| } | ||
| ) | ||
|
|
||
| batch = consumer._next() | ||
|
|
||
| assert batch[0]["body"] == { | ||
| "id": "score-1", | ||
| "sessionId": "session-1", | ||
| "name": "rating", | ||
| "value": 2, | ||
| "dataType": "NUMERIC", | ||
| } | ||
|
|
||
|
|
||
| def test_path(): | ||
| path = Path("/tmp/test.txt") | ||
| serializer = EventSerializer() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 The new code calls
event["body"].dict(exclude_none=True)forUniversalBaseModelinstances, butUniversalBaseModel.dict()in Pydantic V2 hardcodesexclude_none=Falseafter**kwargs, overriding the caller'sexclude_none=True; explicitly-setNonefields (e.g.ScoreBody(trace_id=None)) will appear in the serialized API payload asnullrather than being omitted. This is a regression from the previousmodel_dump(exclude_none=True)which correctly stripped allNonevalues, and may cause server-side validation errors for users who explicitly passNonefor optional score body fields. The fix should callmodel_dump(exclude_none=True, by_alias=True)directly instead of relying onUniversalBaseModel.dict().Extended reasoning...
What the bug is and how it manifests
The PR changes
score_ingestion_consumer.pyline 77 to callevent["body"].dict(exclude_none=True)forUniversalBaseModelinstances instead of the oldmodel_dump(exclude_none=True). The intent is correct — using.dict()enables alias-aware serialization (camelCase keys). However,UniversalBaseModel.dict()in Pydantic V2 does not reliably honor the caller'sexclude_none=Truefor fields that were explicitly set toNone.The specific code path that triggers it
In
langfuse/api/core/pydantic_utilities.pylines 135–151,UniversalBaseModel.dict()builds two model dump calls for Pydantic V2:Because
"exclude_none": Falseis a literal key placed after**kwargs, it always wins. The firstmodel_dumptherefore runs withexclude_none=Falseregardless of what the caller passed.Why existing code doesn't prevent it
The deep-merge logic (
deep_union_pydantic_dicts) then merges source (from the first dump,exclude_unset=True, exclude_none=False) into destination (from the second dump,exclude_none=True). For a field explicitly set toNone: the first dump includes it (it was set, andexclude_none=False); the second dump omits it. The merge iterates over source keys and writes them into destination, sotraceId: Noneends up in the final output even though the caller asked forexclude_none=True.Step-by-step proof
ScoreBody(name="rating", value=1, trace_id=None)trace_idis now in the model's__fields_set__(explicitly set).event["body"].dict(exclude_none=True)is called.model_dumpruns with{by_alias: True, exclude_unset: True, exclude_none: False}→ returns{"name": "rating", "value": 1, "traceId": null}.model_dumpruns with{by_alias: True, exclude_none: True, exclude_unset: False}→ returns{"name": "rating", "value": 1}(traceId omitted).deep_union_pydantic_dicts(source, destination)copiestraceId: Nonefrom source into destination.{"name": "rating", "value": 1, "traceId": null}— theNonefield was not excluded.model_dump(exclude_none=True)would have returned{"name": "rating", "value": 1}with notraceIdkey at all.What the impact would be
When users explicitly set optional score body fields to
None(a common pattern for clearing/defaulting fields), the null values are now included in the POST body sent to the Langfuse API. If the server performs strict schema validation and rejects unexpected null fields, these requests will fail — a regression that previously worked fine withmodel_dump(exclude_none=True). The added tests only use non-None field values, so this regression is not caught by the new test suite.How to fix it
Replace
event["body"].dict(exclude_none=True)withevent["body"].model_dump(exclude_none=True, by_alias=True). This achieves both goals: alias-aware camelCase serialization and reliable exclusion of allNonefields, bypassing the broken override inUniversalBaseModel.dict().