Skip to content

Fix async generator case in decorator#1233

Closed
kouk wants to merge 5 commits intolangfuse:mainfrom
kouk:patch-1
Closed

Fix async generator case in decorator#1233
kouk wants to merge 5 commits intolangfuse:mainfrom
kouk:patch-1

Conversation

@kouk
Copy link
Copy Markdown

@kouk kouk commented Jun 30, 2025

The decorator code in langfuse/_client/observe.py seems to have been built to support the case of observing both async and async generators. Based on the code it seems the intent is that the resulting generator should be wrapped in an nested async generator function so to capture the output. However this only works for the case of the sync generator, since in the case os an async generator function:

  1. the decorator is using asyncio.iscoroutinefunction to decide whether to use the async or the sync wrapper which doesn't detect async generator functions (the inspect.isasyncgenfunction does however). So the sync wrapper is used, which would work ok (since async generator functions should not be awaited) except
  2. the sync wrapper checks if the result is a sync generator to decide whether to nest it inside another generator (using inspect.isgenerator), but even if that check was updated to detected the async generator it would still use the wrong implementation (the correct would be _wrap_async_generator_result).

This means that:

  1. the output that is captured is simply <async generator>, and
  2. if there's no parent trace context already the result of the async generator function is returned before it has begun execution so the trace context is lost. This results in the nested observations from the async generator belonging to their own separate trace. This is an issue that is not being addressed in this PR.

We could use inspect.asyncgenfunction to detect this case at decoration time and use the async wrapper instead of the sync wrapper, but this would have the weird side effect that the decorated method would be an awaitable coroutine (in contrast to an async generator function which is not awaitable). When awaited this coroutine would returns an async generator function, which is undesirable.

So it seems that for async generator functions the decorator could in fact continue to use the sync wrapper, but that the wrapper should be able to handle the case where the result of calling the function is an async generator. This is the approach of this PR.

demo

with this code and current version of langfuse

import asyncio
from langfuse import observe

@observe
async def llmprint(s):
    print(s)

@observe
async def myasyncgen():
    await llmprint("hi")
    yield "lol"
    await llmprint("bye")

# since there isn't a decorator on this we will generate 4 traces instead of 2  
async def run_test():
    async for lol in myasyncgen():
        await llmprint(lol)

if __name__ == "__main__":
    asyncio.run(run_test())

I see this
image

However with the code from this PR I see instead:
image


Important

Fixes async generator handling in @observe decorator by using inspect.isasyncgenfunction and updating sync wrapper for async generator results.

  • Behavior:
    • Fixes async generator handling in @observe decorator in langfuse/_client/observe.py.
    • Uses inspect.isasyncgenfunction to detect async generator functions.
    • Updates sync wrapper to handle async generator results using inspect.isasyncgen.
  • Imports:
    • Removes unused asyncio import from langfuse/_client/observe.py.
  • Testing:
    • Validated by existing test test_async_generator_as_return_value().

This description was created by Ellipsis for 2e03b03. You can customize this summary. It will automatically update as commits are pushed.


Greptile Summary

Disclaimer: Experimental PR review

Fixes critical bug in @observe decorator's async generator handling by improving detection and maintaining proper execution context.

  • Replaces asyncio.iscoroutinefunction with inspect module functions for accurate async generator detection
  • Ensures async generator output is properly captured instead of showing <async generator>
  • Prevents trace context loss by maintaining execution context for async generators
  • Fixes issue where nested observations from async generators were incorrectly creating separate traces
  • Validated by existing test_async_generator_as_return_value which confirms proper async generator behavior

The decorator code in `langfuse/_client/observe.py` seems to have been built to support the case of observing an async generator however it misses the mark by using `asyncio.iscoroutinefunction` to decide whether to wrap it in the async or the sync implementation. That method doesn't detect async generators.

The solution is to use the inspect module and check for both cases.
@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Jun 30, 2025

CLA assistant check
All committers have signed the CLA.

Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

1 file reviewed, no comments
Edit PR Review Bot Settings | Greptile

@kouk kouk marked this pull request as draft June 30, 2025 16:58
@kouk kouk marked this pull request as ready for review July 1, 2025 09:45
Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

1 file reviewed, no comments
Edit PR Review Bot Settings | Greptile

@kouk
Copy link
Copy Markdown
Author

kouk commented Jul 7, 2025

related to langfuse/langfuse#7226

@hassiebp
Copy link
Copy Markdown
Contributor

Thanks a lot for your contribution, will be added in #1259

@hassiebp hassiebp closed this Jul 16, 2025
@hassiebp
Copy link
Copy Markdown
Contributor

Released in Python SDK v3.2.1

@kouk
Copy link
Copy Markdown
Author

kouk commented Jul 17, 2025

@hassiebp thanks! I tested the new version and now the issue I reported returns the output correctly!

There is still some unexpected behavior that I mentioned in #1233 where each "yield" from the async generator produces it's own trace, but not sure if that should be considered a bug. In any case one can easily workaround the problem by starting a trace before creating the generator.

However while re-testing this I found something else that was unexpected for me, and maybe worth opening a new issue about it (let me know what you think). Here's a test case:

import asyncio
from langfuse import observe

@observe
async def llmprint(s):
    print(s)

@observe
async def myasyncgen():
    await llmprint("hi")
    yield "lol1"
    await llmprint("having fun?")
    yield "lol2"
    await llmprint("bye")

@observe
async def run_test():
    async for lol in myasyncgen():
        await llmprint(lol)

if __name__ == "__main__":
    asyncio.run(run_test())

What I see in langfuse is this (link to trace in langfuse):
image

I think it's probably ok that the two spans from the async for loop's inner code are nested under the top level span. But shouldn't the three spans that were created inside the async generator be nested under the async generator's span?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants