Skip to content

Commit 9efcca0

Browse files
authored
fix: apply stricter early routing for base64 media to prevent SSE dat… (#1544)
1 parent 96e8189 commit 9efcca0

File tree

2 files changed

+122
-1
lines changed

2 files changed

+122
-1
lines changed

langfuse/_task_manager/media_manager.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,12 @@ def _process_data_recursively(data: Any, level: int) -> Any:
8686

8787
return data
8888

89-
if isinstance(data, str) and data.startswith("data:"):
89+
if (
90+
isinstance(data, str)
91+
and data.startswith("data:")
92+
and "," in data
93+
and data.split(",", 1)[0].endswith(";base64")
94+
):
9095
media = LangfuseMedia(
9196
obj=data,
9297
base64_data_uri=data,

tests/test_media_manager.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import pytest
77

88
from langfuse._task_manager.media_manager import MediaManager
9+
from langfuse.media import LangfuseMedia
910

1011

1112
def _upload_response(status_code: int, text: str = "") -> httpx.Response:
@@ -78,3 +79,118 @@ def test_media_upload_gives_up_on_non_retryable_http_status():
7879
assert httpx_client.put.call_count == 1
7980
media_api.patch.assert_called_once()
8081
assert media_api.patch.call_args.kwargs["upload_http_status"] == 403
82+
83+
84+
def test_find_and_process_media_sse_done_passes_through():
85+
queue = Queue()
86+
manager = MediaManager(
87+
api_client=SimpleNamespace(media=Mock()),
88+
httpx_client=Mock(),
89+
media_upload_queue=queue,
90+
)
91+
92+
data = "data: [DONE]"
93+
result = manager._find_and_process_media(
94+
data=data, trace_id="trace-id", observation_id=None, field="output"
95+
)
96+
97+
assert result == data
98+
assert queue.empty()
99+
100+
101+
def test_find_and_process_media_sse_json_payload_passes_through():
102+
queue = Queue()
103+
manager = MediaManager(
104+
api_client=SimpleNamespace(media=Mock()),
105+
httpx_client=Mock(),
106+
media_upload_queue=queue,
107+
)
108+
109+
plain_sse = 'data: {"choices": [{"delta": {"content": "hello"}}]}'
110+
result = manager._find_and_process_media(
111+
data=plain_sse, trace_id="trace-id", observation_id=None, field="output"
112+
)
113+
assert result == plain_sse
114+
115+
tricky_sse = 'data: {"text": "what is ;base64 encoding?", "count": 1}'
116+
result = manager._find_and_process_media(
117+
data=tricky_sse, trace_id="trace-id", observation_id=None, field="output"
118+
)
119+
assert result == tricky_sse
120+
assert queue.empty()
121+
122+
123+
def test_find_and_process_media_valid_base64_uri_is_processed():
124+
queue = Queue()
125+
manager = MediaManager(
126+
api_client=SimpleNamespace(media=Mock()),
127+
httpx_client=Mock(),
128+
media_upload_queue=queue,
129+
)
130+
131+
valid_base64_data_uri = (
132+
"data:image/jpeg;base64,/9j/4AAQSkZJRgABAQAAAQABAAD/4QBARXhpZgAA"
133+
)
134+
result = manager._find_and_process_media(
135+
data=valid_base64_data_uri,
136+
trace_id="trace-id",
137+
observation_id=None,
138+
field="input",
139+
)
140+
141+
assert isinstance(result, LangfuseMedia)
142+
assert not queue.empty()
143+
144+
145+
def test_find_and_process_media_data_uri_without_comma_passes_through():
146+
queue = Queue()
147+
manager = MediaManager(
148+
api_client=SimpleNamespace(media=Mock()),
149+
httpx_client=Mock(),
150+
media_upload_queue=queue,
151+
)
152+
153+
data = "data:image/png;base64"
154+
result = manager._find_and_process_media(
155+
data=data, trace_id="trace-id", observation_id=None, field="input"
156+
)
157+
158+
assert result == data
159+
assert queue.empty()
160+
161+
162+
def test_find_and_process_media_non_base64_data_uri_passes_through():
163+
queue = Queue()
164+
manager = MediaManager(
165+
api_client=SimpleNamespace(media=Mock()),
166+
httpx_client=Mock(),
167+
media_upload_queue=queue,
168+
)
169+
170+
data = "data:text/plain,hello world"
171+
result = manager._find_and_process_media(
172+
data=data, trace_id="trace-id", observation_id=None, field="input"
173+
)
174+
175+
assert result == data
176+
assert queue.empty()
177+
178+
179+
def test_find_and_process_media_sse_in_nested_structure_passes_through():
180+
queue = Queue()
181+
manager = MediaManager(
182+
api_client=SimpleNamespace(media=Mock()),
183+
httpx_client=Mock(),
184+
media_upload_queue=queue,
185+
)
186+
187+
data = [
188+
{"role": "assistant", "content": "data: [DONE]"},
189+
{"role": "user", "content": "data: hello"},
190+
]
191+
result = manager._find_and_process_media(
192+
data=data, trace_id="trace-id", observation_id=None, field="output"
193+
)
194+
195+
assert result == data
196+
assert queue.empty()

0 commit comments

Comments
 (0)