Skip to content

Commit fc44bec

Browse files
authored
feat(decorator): require langfuse_public_key only in top most decorated func (#1281)
1 parent f84a7b6 commit fc44bec

File tree

4 files changed

+881
-147
lines changed

4 files changed

+881
-147
lines changed

langfuse/_client/get_client.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,37 @@
1-
from typing import Optional
1+
from contextlib import contextmanager
2+
from contextvars import ContextVar
3+
from typing import Iterator, Optional
24

35
from langfuse._client.client import Langfuse
46
from langfuse._client.resource_manager import LangfuseResourceManager
57
from langfuse.logger import langfuse_logger
68

9+
# Context variable to track the current langfuse_public_key in execution context
10+
_current_public_key: ContextVar[Optional[str]] = ContextVar(
11+
"langfuse_public_key", default=None
12+
)
13+
14+
15+
@contextmanager
16+
def _set_current_public_key(public_key: Optional[str]) -> Iterator[None]:
17+
"""Context manager to set and restore the current public key in execution context.
18+
19+
Args:
20+
public_key: The public key to set in context. If None, context is not modified.
21+
22+
Yields:
23+
None
24+
"""
25+
if public_key is None:
26+
yield # Don't modify context if no key provided
27+
return
28+
29+
token = _current_public_key.set(public_key)
30+
try:
31+
yield
32+
finally:
33+
_current_public_key.reset(token)
34+
735

836
def get_client(*, public_key: Optional[str] = None) -> Langfuse:
937
"""Get or create a Langfuse client instance.
@@ -49,6 +77,10 @@ def get_client(*, public_key: Optional[str] = None) -> Langfuse:
4977
with LangfuseResourceManager._lock:
5078
active_instances = LangfuseResourceManager._instances
5179

80+
# If no explicit public_key provided, check execution context
81+
if not public_key:
82+
public_key = _current_public_key.get(None)
83+
5284
if not public_key:
5385
if len(active_instances) == 0:
5486
# No clients initialized yet, create default instance

langfuse/_client/observe.py

Lines changed: 115 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from langfuse._client.environment_variables import (
2626
LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED,
2727
)
28-
from langfuse._client.get_client import get_client
28+
from langfuse._client.get_client import _set_current_public_key, get_client
2929
from langfuse._client.span import LangfuseGeneration, LangfuseSpan
3030
from langfuse.types import TraceContext
3131

@@ -231,72 +231,75 @@ async def async_wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> Any:
231231
else None
232232
)
233233
public_key = cast(str, kwargs.pop("langfuse_public_key", None))
234-
langfuse_client = get_client(public_key=public_key)
235-
context_manager: Optional[
236-
Union[
237-
_AgnosticContextManager[LangfuseGeneration],
238-
_AgnosticContextManager[LangfuseSpan],
239-
]
240-
] = (
241-
(
242-
langfuse_client.start_as_current_generation(
243-
name=final_name,
244-
trace_context=trace_context,
245-
input=input,
246-
end_on_exit=False, # when returning a generator, closing on exit would be to early
247-
)
248-
if as_type == "generation"
249-
else langfuse_client.start_as_current_span(
250-
name=final_name,
251-
trace_context=trace_context,
252-
input=input,
253-
end_on_exit=False, # when returning a generator, closing on exit would be to early
234+
235+
# Set public key in execution context for nested decorated functions
236+
with _set_current_public_key(public_key):
237+
langfuse_client = get_client(public_key=public_key)
238+
context_manager: Optional[
239+
Union[
240+
_AgnosticContextManager[LangfuseGeneration],
241+
_AgnosticContextManager[LangfuseSpan],
242+
]
243+
] = (
244+
(
245+
langfuse_client.start_as_current_generation(
246+
name=final_name,
247+
trace_context=trace_context,
248+
input=input,
249+
end_on_exit=False, # when returning a generator, closing on exit would be to early
250+
)
251+
if as_type == "generation"
252+
else langfuse_client.start_as_current_span(
253+
name=final_name,
254+
trace_context=trace_context,
255+
input=input,
256+
end_on_exit=False, # when returning a generator, closing on exit would be to early
257+
)
254258
)
259+
if langfuse_client
260+
else None
255261
)
256-
if langfuse_client
257-
else None
258-
)
259262

260-
if context_manager is None:
261-
return await func(*args, **kwargs)
263+
if context_manager is None:
264+
return await func(*args, **kwargs)
262265

263-
with context_manager as langfuse_span_or_generation:
264-
is_return_type_generator = False
266+
with context_manager as langfuse_span_or_generation:
267+
is_return_type_generator = False
265268

266-
try:
267-
result = await func(*args, **kwargs)
269+
try:
270+
result = await func(*args, **kwargs)
268271

269-
if capture_output is True:
270-
if inspect.isgenerator(result):
271-
is_return_type_generator = True
272+
if capture_output is True:
273+
if inspect.isgenerator(result):
274+
is_return_type_generator = True
272275

273-
return self._wrap_sync_generator_result(
274-
langfuse_span_or_generation,
275-
result,
276-
transform_to_string,
277-
)
276+
return self._wrap_sync_generator_result(
277+
langfuse_span_or_generation,
278+
result,
279+
transform_to_string,
280+
)
278281

279-
if inspect.isasyncgen(result):
280-
is_return_type_generator = True
282+
if inspect.isasyncgen(result):
283+
is_return_type_generator = True
281284

282-
return self._wrap_async_generator_result(
283-
langfuse_span_or_generation,
284-
result,
285-
transform_to_string,
286-
)
285+
return self._wrap_async_generator_result(
286+
langfuse_span_or_generation,
287+
result,
288+
transform_to_string,
289+
)
287290

288-
langfuse_span_or_generation.update(output=result)
291+
langfuse_span_or_generation.update(output=result)
289292

290-
return result
291-
except Exception as e:
292-
langfuse_span_or_generation.update(
293-
level="ERROR", status_message=str(e)
294-
)
293+
return result
294+
except Exception as e:
295+
langfuse_span_or_generation.update(
296+
level="ERROR", status_message=str(e)
297+
)
295298

296-
raise e
297-
finally:
298-
if not is_return_type_generator:
299-
langfuse_span_or_generation.end()
299+
raise e
300+
finally:
301+
if not is_return_type_generator:
302+
langfuse_span_or_generation.end()
300303

301304
return cast(F, async_wrapper)
302305

@@ -333,72 +336,75 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
333336
else None
334337
)
335338
public_key = kwargs.pop("langfuse_public_key", None)
336-
langfuse_client = get_client(public_key=public_key)
337-
context_manager: Optional[
338-
Union[
339-
_AgnosticContextManager[LangfuseGeneration],
340-
_AgnosticContextManager[LangfuseSpan],
341-
]
342-
] = (
343-
(
344-
langfuse_client.start_as_current_generation(
345-
name=final_name,
346-
trace_context=trace_context,
347-
input=input,
348-
end_on_exit=False, # when returning a generator, closing on exit would be to early
349-
)
350-
if as_type == "generation"
351-
else langfuse_client.start_as_current_span(
352-
name=final_name,
353-
trace_context=trace_context,
354-
input=input,
355-
end_on_exit=False, # when returning a generator, closing on exit would be to early
339+
340+
# Set public key in execution context for nested decorated functions
341+
with _set_current_public_key(public_key):
342+
langfuse_client = get_client(public_key=public_key)
343+
context_manager: Optional[
344+
Union[
345+
_AgnosticContextManager[LangfuseGeneration],
346+
_AgnosticContextManager[LangfuseSpan],
347+
]
348+
] = (
349+
(
350+
langfuse_client.start_as_current_generation(
351+
name=final_name,
352+
trace_context=trace_context,
353+
input=input,
354+
end_on_exit=False, # when returning a generator, closing on exit would be to early
355+
)
356+
if as_type == "generation"
357+
else langfuse_client.start_as_current_span(
358+
name=final_name,
359+
trace_context=trace_context,
360+
input=input,
361+
end_on_exit=False, # when returning a generator, closing on exit would be to early
362+
)
356363
)
364+
if langfuse_client
365+
else None
357366
)
358-
if langfuse_client
359-
else None
360-
)
361367

362-
if context_manager is None:
363-
return func(*args, **kwargs)
368+
if context_manager is None:
369+
return func(*args, **kwargs)
364370

365-
with context_manager as langfuse_span_or_generation:
366-
is_return_type_generator = False
371+
with context_manager as langfuse_span_or_generation:
372+
is_return_type_generator = False
367373

368-
try:
369-
result = func(*args, **kwargs)
374+
try:
375+
result = func(*args, **kwargs)
370376

371-
if capture_output is True:
372-
if inspect.isgenerator(result):
373-
is_return_type_generator = True
377+
if capture_output is True:
378+
if inspect.isgenerator(result):
379+
is_return_type_generator = True
374380

375-
return self._wrap_sync_generator_result(
376-
langfuse_span_or_generation,
377-
result,
378-
transform_to_string,
379-
)
381+
return self._wrap_sync_generator_result(
382+
langfuse_span_or_generation,
383+
result,
384+
transform_to_string,
385+
)
380386

381-
if inspect.isasyncgen(result):
382-
is_return_type_generator = True
387+
if inspect.isasyncgen(result):
388+
is_return_type_generator = True
383389

384-
return self._wrap_async_generator_result(
385-
langfuse_span_or_generation,
386-
result,
387-
transform_to_string,
388-
)
390+
return self._wrap_async_generator_result(
391+
langfuse_span_or_generation,
392+
result,
393+
transform_to_string,
394+
)
389395

390-
langfuse_span_or_generation.update(output=result)
396+
langfuse_span_or_generation.update(output=result)
391397

392-
return result
393-
except Exception as e:
394-
langfuse_span_or_generation.update(
395-
level="ERROR", status_message=str(e)
396-
)
398+
return result
399+
except Exception as e:
400+
langfuse_span_or_generation.update(
401+
level="ERROR", status_message=str(e)
402+
)
397403

398-
raise e
399-
finally:
400-
if not is_return_type_generator:
401-
langfuse_span_or_generation.end()
404+
raise e
405+
finally:
406+
if not is_return_type_generator:
407+
langfuse_span_or_generation.end()
402408

403409
return cast(F, sync_wrapper)
404410

0 commit comments

Comments
 (0)