|
5 | 5 |
|
6 | 6 |
|
7 | 7 | from langfuse import Langfuse |
8 | | -from langfuse.client import InitialGeneration, CreateTrace |
| 8 | +from langfuse.client import InitialGeneration, CreateTrace, StatefulGenerationClient |
9 | 9 |
|
10 | 10 | from distutils.version import StrictVersion |
11 | 11 | import openai |
12 | 12 | from wrapt import wrap_function_wrapper |
13 | 13 |
|
| 14 | +from langfuse.model import UpdateGeneration |
| 15 | + |
14 | 16 |
|
15 | 17 | class OpenAiDefinition: |
16 | 18 | module: str |
@@ -128,7 +130,7 @@ def _get_langfuse_data_from_kwargs(resource: OpenAiDefinition, langfuse: Langfus |
128 | 130 | return InitialGeneration(name=name, metadata=metadata, trace_id=trace_id, start_time=start_time, prompt=prompt, modelParameters=modelParameters, model=model) |
129 | 131 |
|
130 | 132 |
|
131 | | -def _get_lagnfuse_data_from_streaming_response(resource: OpenAiDefinition, response, generation: InitialGeneration, langfuse: Langfuse): |
| 133 | +def _get_lagnfuse_data_from_streaming_response(resource: OpenAiDefinition, response, generation: StatefulGenerationClient, langfuse: Langfuse): |
132 | 134 | final_response = [] if resource.type == "chat" else "" |
133 | 135 | model = None |
134 | 136 | completion_start_time = None |
@@ -180,12 +182,10 @@ def get_response_for_chat(): |
180 | 182 | return final_response[-1]["tool_calls"] |
181 | 183 | return None |
182 | 184 |
|
183 | | - new_generation = generation.copy( |
184 | | - update={"end_time": datetime.now(), "completion": get_response_for_chat() if resource.type == "chat" else final_response, "completion_start_time": completion_start_time} |
185 | | - ) |
| 185 | + update = UpdateGeneration(end_time=datetime.now(), completion=get_response_for_chat() if resource.type == "chat" else final_response, completion_start_time=completion_start_time) |
186 | 186 | if model is not None: |
187 | | - new_generation = new_generation.copy(update={"model": model}) |
188 | | - langfuse.generation(new_generation) |
| 187 | + update = update.copy(update={"model": model}) |
| 188 | + generation.update(update) |
189 | 189 |
|
190 | 190 |
|
191 | 191 | def _get_langfuse_data_from_default_response(resource: OpenAiDefinition, response): |
@@ -225,21 +225,20 @@ def _wrap(open_ai_resource: OpenAiDefinition, langfuse: Langfuse, initialize, wr |
225 | 225 | arg_extractor = OpenAiArgsExtractor(*args, **kwargs) |
226 | 226 |
|
227 | 227 | generation = _get_langfuse_data_from_kwargs(open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args()) |
228 | | - updated_generation = generation |
| 228 | + generation = new_langfuse.generation(generation) |
229 | 229 | try: |
230 | 230 | openai_response = wrapped(**arg_extractor.get_openai_args()) |
231 | 231 |
|
232 | 232 | if _is_streaming_response(openai_response): |
233 | | - return _get_lagnfuse_data_from_streaming_response(open_ai_resource, openai_response, updated_generation, new_langfuse) |
| 233 | + return _get_lagnfuse_data_from_streaming_response(open_ai_resource, openai_response, generation, new_langfuse) |
234 | 234 |
|
235 | 235 | else: |
236 | 236 | model, completion, usage = _get_langfuse_data_from_default_response(open_ai_resource, openai_response.__dict__ if _is_openai_v1() else openai_response) |
237 | | - updated_generation = generation.copy(update={"model": model, "completion": completion, "end_time": datetime.now(), "usage": usage}) |
238 | | - new_langfuse.generation(updated_generation) |
| 237 | + generation.update(UpdateGeneration(model=model, completion=completion, end_time=datetime.now(), usage=usage)) |
239 | 238 | return openai_response |
240 | 239 | except Exception as ex: |
241 | 240 | model = kwargs.get("model", None) |
242 | | - new_langfuse.generation(updated_generation.copy(update={"end_time": datetime.now(), "status_message": str(ex), "level": "ERROR", "model": model})) |
| 241 | + generation.update(UpdateGeneration(endTime=datetime.now(), statusMessage=str(ex), level="ERROR", model=model)) |
243 | 242 | raise ex |
244 | 243 |
|
245 | 244 |
|
|
0 commit comments