forked from langfuse/langfuse-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathobserve.py
More file actions
458 lines (389 loc) · 16.8 KB
/
observe.py
File metadata and controls
458 lines (389 loc) · 16.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
import inspect
import logging
import os
from functools import wraps
from typing import (
Any,
AsyncGenerator,
Callable,
Dict,
Generator,
Iterable,
Literal,
Optional,
Tuple,
TypeVar,
Union,
cast,
overload,
)
from typing_extensions import ParamSpec
from langfuse._client.environment_variables import (
LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED,
)
from langfuse._client.get_client import get_client
from langfuse._client.span import LangfuseGeneration, LangfuseSpan
from langfuse.types import TraceContext
F = TypeVar("F", bound=Callable[..., Any])
P = ParamSpec("P")
R = TypeVar("R")
class LangfuseDecorator:
"""Implementation of the @observe decorator for seamless Langfuse tracing integration.
This class provides the core functionality for the @observe decorator, which enables
automatic tracing of functions and methods in your application with Langfuse.
It handles both synchronous and asynchronous functions, maintains proper trace context,
and intelligently routes to the correct Langfuse client instance.
The implementation follows a singleton pattern where a single decorator instance
handles all @observe decorations throughout the application codebase.
Features:
- Automatic span creation and management for both sync and async functions
- Proper trace context propagation between decorated functions
- Specialized handling for LLM-related spans with the 'as_type="generation"' parameter
- Type-safe decoration that preserves function signatures and type hints
- Support for explicit trace and parent span ID specification
- Thread-safe client resolution when multiple Langfuse projects are used
"""
_log = logging.getLogger("langfuse")
@overload
def observe(self, func: F) -> F: ...
@overload
def observe(
self,
func: None = None,
*,
name: Optional[str] = None,
as_type: Optional[Literal["generation"]] = None,
capture_input: Optional[bool] = None,
capture_output: Optional[bool] = None,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
) -> Callable[[F], F]: ...
def observe(
self,
func: Optional[F] = None,
*,
name: Optional[str] = None,
as_type: Optional[Literal["generation"]] = None,
capture_input: Optional[bool] = None,
capture_output: Optional[bool] = None,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
) -> Union[F, Callable[[F], F]]:
"""Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
This decorator provides seamless integration of Langfuse observability into your codebase. It automatically creates
spans or generations around function execution, capturing timing, inputs/outputs, and error states. The decorator
intelligently handles both synchronous and asynchronous functions, preserving function signatures and type hints.
Using OpenTelemetry's distributed tracing system, it maintains proper trace context propagation throughout your application,
enabling you to see hierarchical traces of function calls with detailed performance metrics and function-specific details.
Args:
func (Optional[Callable]): The function to decorate. When used with parentheses @observe(), this will be None.
name (Optional[str]): Custom name for the created trace or span. If not provided, the function name is used.
as_type (Optional[Literal["generation"]]): Set to "generation" to create a specialized LLM generation span
with model metrics support, suitable for tracking language model outputs.
Returns:
Callable: A wrapped version of the original function that automatically creates and manages Langfuse spans.
Example:
For general function tracing with automatic naming:
```python
@observe()
def process_user_request(user_id, query):
# Function is automatically traced with name "process_user_request"
return get_response(query)
```
For language model generation tracking:
```python
@observe(name="answer-generation", as_type="generation")
async def generate_answer(query):
# Creates a generation-type span with extended LLM metrics
response = await openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": query}]
)
return response.choices[0].message.content
```
For trace context propagation between functions:
```python
@observe()
def main_process():
# Parent span is created
return sub_process() # Child span automatically connected to parent
@observe()
def sub_process():
# Automatically becomes a child span of main_process
return "result"
```
Raises:
Exception: Propagates any exceptions from the wrapped function after logging them in the trace.
Notes:
- The decorator preserves the original function's signature, docstring, and return type.
- Proper parent-child relationships between spans are automatically maintained.
- Special keyword arguments can be passed to control tracing:
- langfuse_trace_id: Explicitly set the trace ID for this function call
- langfuse_parent_observation_id: Explicitly set the parent span ID
- langfuse_public_key: Use a specific Langfuse project (when multiple clients exist)
- For async functions, the decorator returns an async function wrapper.
- For sync functions, the decorator returns a synchronous wrapper.
"""
function_io_capture_enabled = os.environ.get(
LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED, "True"
).lower() not in ("false", "0")
should_capture_input = (
capture_input if capture_input is not None else function_io_capture_enabled
)
should_capture_output = (
capture_output
if capture_output is not None
else function_io_capture_enabled
)
def decorator(func: F) -> F:
return (
self._async_observe(
func,
name=name,
as_type=as_type,
capture_input=should_capture_input,
capture_output=should_capture_output,
transform_to_string=transform_to_string,
)
if inspect.iscoroutinefunction(func)
else self._sync_observe(
func,
name=name,
as_type=as_type,
capture_input=should_capture_input,
capture_output=should_capture_output,
transform_to_string=transform_to_string,
)
)
"""Handle decorator with or without parentheses.
This logic enables the decorator to work both with and without parentheses:
- @observe - Python passes the function directly to the decorator
- @observe() - Python calls the decorator first, which must return a function decorator
When called without arguments (@observe), the func parameter contains the function to decorate,
so we directly apply the decorator to it. When called with parentheses (@observe()),
func is None, so we return the decorator function itself for Python to apply in the next step.
"""
if func is None:
return decorator
else:
return decorator(func)
def _async_observe(
self,
func: F,
*,
name: Optional[str],
as_type: Optional[Literal["generation"]],
capture_input: bool,
capture_output: bool,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
) -> F:
@wraps(func)
async def async_wrapper(*args, **kwargs):
trace_id = kwargs.pop("langfuse_trace_id", None)
parent_observation_id = kwargs.pop("langfuse_parent_observation_id", None)
trace_context: Optional[TraceContext] = (
{
"trace_id": trace_id,
"parent_span_id": parent_observation_id,
}
if trace_id
else None
)
final_name = name or func.__name__
input = (
self._get_input_from_func_args(
is_method=self._is_method(func),
func_args=args,
func_kwargs=kwargs,
)
if capture_input
else None
)
public_key = kwargs.pop("langfuse_public_key", None)
langfuse_client = get_client(public_key=public_key)
context_manager = (
(
langfuse_client.start_as_current_generation(
name=final_name,
trace_context=trace_context,
input=input,
end_on_exit=False, # when returning a generator, closing on exit would be to early
)
if as_type == "generation"
else langfuse_client.start_as_current_span(
name=final_name,
trace_context=trace_context,
input=input,
end_on_exit=False, # when returning a generator, closing on exit would be to early
)
)
if langfuse_client
else None
)
if context_manager is None:
return await func(*args, **kwargs)
with context_manager as langfuse_span_or_generation:
is_return_type_generator = False
try:
result = await func(*args, **kwargs)
if capture_output is True:
if inspect.isasyncgen(result):
is_return_type_generator = True
return self._wrap_async_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)
langfuse_span_or_generation.update(output=result)
return result
except Exception as e:
langfuse_span_or_generation.update(
level="ERROR", status_message=str(e)
)
raise e
finally:
if not is_return_type_generator:
langfuse_span_or_generation.end()
return cast(F, async_wrapper)
def _sync_observe(
self,
func: F,
*,
name: Optional[str],
as_type: Optional[Literal["generation"]],
capture_input: bool,
capture_output: bool,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
) -> F:
@wraps(func)
def sync_wrapper(*args, **kwargs):
trace_id = kwargs.pop("langfuse_trace_id", None)
parent_observation_id = kwargs.pop("langfuse_parent_observation_id", None)
trace_context: Optional[TraceContext] = (
{
"trace_id": trace_id,
"parent_span_id": parent_observation_id,
}
if trace_id
else None
)
final_name = name or func.__name__
input = (
self._get_input_from_func_args(
is_method=self._is_method(func),
func_args=args,
func_kwargs=kwargs,
)
if capture_input
else None
)
public_key = kwargs.pop("langfuse_public_key", None)
langfuse_client = get_client(public_key=public_key)
context_manager = (
(
langfuse_client.start_as_current_generation(
name=final_name,
trace_context=trace_context,
input=input,
end_on_exit=False, # when returning a generator, closing on exit would be to early
)
if as_type == "generation"
else langfuse_client.start_as_current_span(
name=final_name,
trace_context=trace_context,
input=input,
end_on_exit=False, # when returning a generator, closing on exit would be to early
)
)
if langfuse_client
else None
)
if context_manager is None:
return func(*args, **kwargs)
with context_manager as langfuse_span_or_generation:
is_return_type_generator = False
try:
result = func(*args, **kwargs)
if capture_output is True:
if inspect.isgenerator(result):
is_return_type_generator = True
return self._wrap_sync_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)
elif inspect.isasyncgen(result):
is_return_type_generator = True
return self._wrap_async_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)
langfuse_span_or_generation.update(output=result)
return result
except Exception as e:
langfuse_span_or_generation.update(
level="ERROR", status_message=str(e)
)
raise e
finally:
if not is_return_type_generator:
langfuse_span_or_generation.end()
return cast(F, sync_wrapper)
@staticmethod
def _is_method(func: Callable) -> bool:
return (
"self" in inspect.signature(func).parameters
or "cls" in inspect.signature(func).parameters
)
def _get_input_from_func_args(
self,
*,
is_method: bool = False,
func_args: Tuple = (),
func_kwargs: Dict = {},
) -> Dict:
# Remove implicitly passed "self" or "cls" argument for instance or class methods
logged_args = func_args[1:] if is_method else func_args
return {
"args": logged_args,
"kwargs": func_kwargs,
}
def _wrap_sync_generator_result(
self,
langfuse_span_or_generation: Union[LangfuseSpan, LangfuseGeneration],
generator: Generator,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
):
items = []
try:
for item in generator:
items.append(item)
yield item
finally:
output = items
if transform_to_string is not None:
output = transform_to_string(items)
elif all(isinstance(item, str) for item in items):
output = "".join(items)
langfuse_span_or_generation.update(output=output)
langfuse_span_or_generation.end()
async def _wrap_async_generator_result(
self,
langfuse_span_or_generation: Union[LangfuseSpan, LangfuseGeneration],
generator: AsyncGenerator,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
) -> AsyncGenerator:
items = []
try:
async for item in generator:
items.append(item)
yield item
finally:
output = items
if transform_to_string is not None:
output = transform_to_string(items)
elif all(isinstance(item, str) for item in items):
output = "".join(items)
langfuse_span_or_generation.update(output=output)
langfuse_span_or_generation.end()
_decorator = LangfuseDecorator()
observe = _decorator.observe