Skip to content

Commit de21c5f

Browse files
committed
add scores
1 parent 231a84f commit de21c5f

6 files changed

Lines changed: 585 additions & 64 deletions

File tree

langfuse/_task_manager/media_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def __init__(
3939
def process_next_media_upload(self):
4040
try:
4141
upload_job = self._queue.get(block=True, timeout=1)
42-
self._log.debug(f"Processing upload for {upload_job['media_id']}")
42+
self._log.debug(f"Processing media upload for {upload_job['media_id']}")
4343
self._process_upload_media_job(data=upload_job)
4444

4545
self._queue.task_done()
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import json
2+
import logging
3+
import os
4+
import threading
5+
import time
6+
from queue import Empty, Queue
7+
from typing import Any, List, Optional
8+
9+
import backoff
10+
11+
from ..version import __version__ as langfuse_version
12+
13+
try:
14+
import pydantic.v1 as pydantic
15+
except ImportError:
16+
import pydantic
17+
18+
from langfuse.parse_error import handle_exception
19+
from langfuse.request import APIError, LangfuseClient
20+
from langfuse.serializer import EventSerializer
21+
22+
MAX_EVENT_SIZE_BYTES = int(os.environ.get("LANGFUSE_MAX_EVENT_SIZE_BYTES", 1_000_000))
23+
MAX_BATCH_SIZE_BYTES = int(os.environ.get("LANGFUSE_MAX_BATCH_SIZE_BYTES", 2_500_000))
24+
25+
26+
class ScoreIngestionMetadata(pydantic.BaseModel):
27+
batch_size: int
28+
sdk_name: str
29+
sdk_version: str
30+
public_key: str
31+
32+
33+
class ScoreIngestionConsumer(threading.Thread):
34+
_log = logging.getLogger("langfuse")
35+
36+
def __init__(
37+
self,
38+
*,
39+
ingestion_queue: Queue,
40+
identifier: int,
41+
client: LangfuseClient,
42+
public_key: str,
43+
flush_at: Optional[int] = None,
44+
flush_interval: Optional[float] = None,
45+
max_retries: Optional[int] = None,
46+
):
47+
"""Create a consumer thread."""
48+
super().__init__()
49+
# It's important to set running in the constructor: if we are asked to
50+
# pause immediately after construction, we might set running to True in
51+
# run() *after* we set it to False in pause... and keep running
52+
# forever.
53+
self.running = True
54+
# Make consumer a daemon thread so that it doesn't block program exit
55+
self.daemon = True
56+
self._ingestion_queue = ingestion_queue
57+
self._identifier = identifier
58+
self._client = client
59+
self._flush_at = flush_at or 15
60+
self._flush_interval = flush_interval or 1
61+
self._max_retries = max_retries or 3
62+
self._public_key = public_key
63+
64+
def _next(self):
65+
"""Return the next batch of items to upload."""
66+
events = []
67+
68+
start_time = time.monotonic()
69+
total_size = 0
70+
71+
while len(events) < self._flush_at:
72+
elapsed = time.monotonic() - start_time
73+
if elapsed >= self._flush_interval:
74+
break
75+
try:
76+
event = self._ingestion_queue.get(
77+
block=True, timeout=self._flush_interval - elapsed
78+
)
79+
80+
# convert pydantic models to dicts
81+
if "body" in event and isinstance(event["body"], pydantic.BaseModel):
82+
event["body"] = event["body"].dict(exclude_none=True)
83+
84+
item_size = self._get_item_size(event)
85+
86+
# check for serialization errors
87+
try:
88+
json.dumps(event, cls=EventSerializer)
89+
except Exception as e:
90+
self._log.error(f"Error serializing item, skipping: {e}")
91+
self._ingestion_queue.task_done()
92+
93+
continue
94+
95+
events.append(event)
96+
97+
total_size += item_size
98+
if total_size >= MAX_BATCH_SIZE_BYTES:
99+
self._log.debug("hit batch size limit (size: %d)", total_size)
100+
break
101+
102+
except Empty:
103+
break
104+
105+
except Exception as e:
106+
self._log.warning(
107+
"Failed to process event in ScoreIngestionConsumer, skipping",
108+
exc_info=e,
109+
)
110+
self._ingestion_queue.task_done()
111+
112+
return events
113+
114+
def _get_item_size(self, item: Any) -> int:
115+
"""Return the size of the item in bytes."""
116+
return len(json.dumps(item, cls=EventSerializer).encode())
117+
118+
def run(self):
119+
"""Run the consumer."""
120+
self._log.debug("consumer is running...")
121+
while self.running:
122+
self.upload()
123+
124+
def upload(self):
125+
"""Upload the next batch of items, return whether successful."""
126+
batch = self._next()
127+
if len(batch) == 0:
128+
return
129+
130+
try:
131+
self._upload_batch(batch)
132+
except Exception as e:
133+
handle_exception(e)
134+
finally:
135+
# mark items as acknowledged from queue
136+
for _ in batch:
137+
self._ingestion_queue.task_done()
138+
139+
def pause(self):
140+
"""Pause the consumer."""
141+
self.running = False
142+
143+
def _upload_batch(self, batch: List[Any]):
144+
self._log.debug("uploading batch of %d items", len(batch))
145+
146+
metadata = ScoreIngestionMetadata(
147+
batch_size=len(batch),
148+
sdk_name="python",
149+
sdk_version=langfuse_version,
150+
public_key=self._public_key,
151+
).dict()
152+
153+
@backoff.on_exception(
154+
backoff.expo, Exception, max_tries=self._max_retries, logger=None
155+
)
156+
def execute_task_with_backoff(batch: List[Any]):
157+
try:
158+
self._client.batch_post(batch=batch, metadata=metadata)
159+
except Exception as e:
160+
if (
161+
isinstance(e, APIError)
162+
and 400 <= int(e.status) < 500
163+
and int(e.status) != 429 # retry if rate-limited
164+
):
165+
return
166+
167+
raise e
168+
169+
execute_task_with_backoff(batch)
170+
self._log.debug(
171+
"successfully uploaded score event batch of size %d", len(batch)
172+
)

0 commit comments

Comments
 (0)