Skip to content

Commit f247b34

Browse files
authored
feature(openai): support API streaming (#178)
1 parent b3293f3 commit f247b34

5 files changed

Lines changed: 290 additions & 30 deletions

File tree

langfuse/openai.py

Lines changed: 85 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import threading
22
from datetime import datetime
3+
import types
34
from typing import Optional
45

56

@@ -71,16 +72,14 @@ def get_openai_args(self):
7172
return self.kwargs
7273

7374

74-
def _with_tracer_wrapper(func):
75-
"""Helper for providing tracer for wrapper functions."""
76-
77-
def _with_tracer(open_ai_definitions, langfuse, initialize):
75+
def _langfuse_wrapper(func):
76+
def _with_langfuse(open_ai_definitions, langfuse, initialize):
7877
def wrapper(wrapped, instance, args, kwargs):
7978
return func(open_ai_definitions, langfuse, initialize, wrapped, instance, args, kwargs)
8079

8180
return wrapper
8281

83-
return _with_tracer
82+
return _with_langfuse
8483

8584

8685
def _get_langfuse_data_from_kwargs(resource: OpenAiDefinition, langfuse: Langfuse, start_time, kwargs):
@@ -101,7 +100,10 @@ def _get_langfuse_data_from_kwargs(resource: OpenAiDefinition, langfuse: Langfus
101100
if metadata is not None and not isinstance(metadata, dict):
102101
raise TypeError("metadata must be a dictionary")
103102

103+
model = kwargs.get("model", None)
104+
104105
prompt = None
106+
105107
if resource.type == "completion":
106108
prompt = kwargs.get("prompt", None)
107109
elif resource.type == "chat":
@@ -123,10 +125,70 @@ def _get_langfuse_data_from_kwargs(resource: OpenAiDefinition, langfuse: Langfus
123125
"presence_penalty": kwargs.get("presence_penalty", 0),
124126
}
125127

126-
return InitialGeneration(name=name, metadata=metadata, trace_id=trace_id, start_time=start_time, prompt=prompt, modelParameters=modelParameters)
128+
return InitialGeneration(name=name, metadata=metadata, trace_id=trace_id, start_time=start_time, prompt=prompt, modelParameters=modelParameters, model=model)
129+
130+
131+
def _get_lagnfuse_data_from_streaming_response(resource: OpenAiDefinition, response, generation: InitialGeneration, langfuse: Langfuse):
132+
final_response = [] if resource.type == "chat" else ""
133+
model = None
134+
completion_start_time = None
135+
for index, i in enumerate(response):
136+
print(index)
137+
if index == 0:
138+
completion_start_time = datetime.now()
139+
140+
if _is_openai_v1():
141+
i = i.__dict__
142+
143+
model = i.get("model", None) if model is None else model
144+
145+
choices = i.get("choices", [])
146+
147+
for choice in choices:
148+
if _is_openai_v1():
149+
choice = choice.__dict__
150+
if resource.type == "chat":
151+
delta = choice.get("delta", None)
152+
153+
if _is_openai_v1():
154+
delta = delta.__dict__
127155

156+
if delta.get("role", None) is not None:
157+
final_response.append({"role": delta.get("role", None), "function_call": None, "tool_calls": None, "content": None})
128158

129-
def _get_langfuse_data_from_response(resource: OpenAiDefinition, response):
159+
elif delta.get("content", None) is not None:
160+
final_response[-1]["content"] = delta.get("content", None) if final_response[-1]["content"] is None else final_response[-1]["content"] + delta.get("content", None)
161+
162+
elif delta.get("function_call", None) is not None:
163+
final_response[-1]["function_call"] = (
164+
delta.get("function_call", None) if final_response[-1]["function_call"] is None else final_response[-1]["function_call"] + delta.get("function_call", None)
165+
)
166+
elif delta.get("tools_call", None) is not None:
167+
final_response[-1]["tool_calls"] = delta.get("tools_call", None) if final_response[-1]["tool_calls"] is None else final_response[-1]["tool_calls"] + delta.get("tools_call", None)
168+
if resource.type == "completion":
169+
final_response += choice.get("text", None)
170+
171+
yield i
172+
173+
def get_response_for_chat():
174+
if len(final_response) > 0:
175+
if final_response[-1].get("content", None) is not None:
176+
return final_response[-1]["content"]
177+
elif final_response[-1].get("function_call", None) is not None:
178+
return final_response[-1]["function_call"]
179+
elif final_response[-1].get("tool_calls", None) is not None:
180+
return final_response[-1]["tool_calls"]
181+
return None
182+
183+
new_generation = generation.copy(
184+
update={"end_time": datetime.now(), "completion": get_response_for_chat() if resource.type == "chat" else final_response, "completion_start_time": completion_start_time}
185+
)
186+
if model is not None:
187+
new_generation = new_generation.copy(update={"model": model})
188+
langfuse.generation(new_generation)
189+
190+
191+
def _get_langfuse_data_from_default_response(resource: OpenAiDefinition, response):
130192
model = response.get("model", None)
131193

132194
completion = None
@@ -151,7 +213,11 @@ def _is_openai_v1():
151213
return StrictVersion(openai.__version__) >= StrictVersion("1.0.0")
152214

153215

154-
@_with_tracer_wrapper
216+
def _is_streaming_response(response):
217+
return isinstance(response, types.GeneratorType) or (_is_openai_v1() and isinstance(response, openai.Stream))
218+
219+
220+
@_langfuse_wrapper
155221
def _wrap(open_ai_resource: OpenAiDefinition, langfuse: Langfuse, initialize, wrapped, instance, args, kwargs):
156222
new_langfuse = initialize()
157223

@@ -161,11 +227,16 @@ def _wrap(open_ai_resource: OpenAiDefinition, langfuse: Langfuse, initialize, wr
161227
generation = _get_langfuse_data_from_kwargs(open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args())
162228
updated_generation = generation
163229
try:
164-
result = wrapped(**arg_extractor.get_openai_args())
165-
model, completion, usage = _get_langfuse_data_from_response(open_ai_resource, result.__dict__ if _is_openai_v1() else result)
166-
updated_generation = generation.copy(update={"model": model, "completion": completion, "end_time": datetime.now(), "usage": usage})
167-
new_langfuse.generation(updated_generation)
168-
return result
230+
openai_response = wrapped(**arg_extractor.get_openai_args())
231+
232+
if _is_streaming_response(openai_response):
233+
return _get_lagnfuse_data_from_streaming_response(open_ai_resource, openai_response, updated_generation, new_langfuse)
234+
235+
else:
236+
model, completion, usage = _get_langfuse_data_from_default_response(open_ai_resource, openai_response.__dict__ if _is_openai_v1() else openai_response)
237+
updated_generation = generation.copy(update={"model": model, "completion": completion, "end_time": datetime.now(), "usage": usage})
238+
new_langfuse.generation(updated_generation)
239+
return openai_response
169240
except Exception as ex:
170241
model = kwargs.get("model", None)
171242
new_langfuse.generation(updated_generation.copy(update={"end_time": datetime.now(), "status_message": str(ex), "level": "ERROR", "model": model}))
@@ -191,9 +262,8 @@ def initialize(self):
191262
self._langfuse = Langfuse(public_key=openai.langfuse_public_key, secret_key=openai.langfuse_secret_key, host=openai.langfuse_host)
192263
return self._langfuse
193264

194-
@classmethod
195265
def flush(cls):
196-
cls._instance._langfuse.flush()
266+
cls._langfuse.flush()
197267

198268
def register_tracing(self):
199269
resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0

langfuse/task_manager.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ def _next(self):
6666
break
6767
try:
6868
item = queue.get(block=True, timeout=self._flush_interval - elapsed)
69-
self._log.debug("got item from queue", item)
7069
item_size = len(json.dumps(item, cls=DatetimeSerializer).encode())
7170
self._log.debug(f"item size {item_size}")
7271
items.append(item)

poetry.lock

Lines changed: 10 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)