From 959e420475c2903ac45ea83d8e407baf66a21b58 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Sat, 9 May 2026 12:04:01 -0400 Subject: [PATCH 01/19] rewrite tracing's 'wrap' that's depending on 'bytecode' lib with a local wrapper --- ddtrace/profiling/_asyncio.py | 57 +++++++++++++++++++++++++++++------ 1 file changed, 47 insertions(+), 10 deletions(-) diff --git a/ddtrace/profiling/_asyncio.py b/ddtrace/profiling/_asyncio.py index 133dab9ddb5..bde8de84bdb 100644 --- a/ddtrace/profiling/_asyncio.py +++ b/ddtrace/profiling/_asyncio.py @@ -2,6 +2,7 @@ from __future__ import annotations from functools import partial +from functools import wraps import sys from types import ModuleType import typing @@ -16,12 +17,33 @@ from ddtrace.internal.module import ModuleWatchdog from ddtrace.internal.settings.profiling import config from ddtrace.internal.utils import get_argument_value -from ddtrace.internal.wrapping import wrap ASYNCIO_IMPORTED: bool = False +def _wrap( + owner: typing.Any, + name: str, + wrapper: typing.Callable[..., typing.Any], + aliases: typing.Sequence[tuple[typing.Any, str]] = (), +) -> typing.Callable[..., typing.Any]: + """Replace ``owner.name`` with a callable that invokes + ``wrapper(original, args, kwargs)``, mirroring the replacement onto every + ``(alias_owner, alias_name)`` in ``aliases``. + """ + original = getattr(owner, name) + + @wraps(original) + def wrapped(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: + return wrapper(original, args, kwargs) + + setattr(owner, name, wrapped) + for alias_owner, alias_name in aliases: + setattr(alias_owner, alias_name, wrapped) + return wrapped + + def current_task() -> typing.Optional[asyncio.Task[typing.Any]]: return None @@ -104,7 +126,7 @@ def _get_running_loop() -> typing.Optional[aio.AbstractEventLoop]: if policy_class is not None: - @partial(wrap, policy_class.set_event_loop) # pyright: ignore[reportArgumentType] + @partial(_wrap, policy_class, "set_event_loop") # pyright: ignore[reportArgumentType] def _( f: typing.Callable[[object, typing.Optional[aio.AbstractEventLoop]], None], args: typing.Any, @@ -117,7 +139,7 @@ def _( if init_stack: - @partial(wrap, sys.modules["asyncio"].tasks._GatheringFuture.__init__) + @partial(_wrap, sys.modules["asyncio"].tasks._GatheringFuture, "__init__") def _(f: typing.Callable[..., None], args: tuple[typing.Any, ...], kwargs: dict[str, typing.Any]) -> None: try: return f(*args, **kwargs) @@ -133,7 +155,7 @@ def _(f: typing.Callable[..., None], args: tuple[typing.Any, ...], kwargs: dict[ for child in children: stack.link_tasks(parent, child) - @partial(wrap, sys.modules["asyncio"].tasks._wait) + @partial(_wrap, sys.modules["asyncio"].tasks, "_wait") def _( f: typing.Callable[..., tuple[set[aio.Future[typing.Any]], set[aio.Future[typing.Any]]]], args: tuple[typing.Any, ...], @@ -149,7 +171,12 @@ def _( for future in futures: stack.link_tasks(parent, future) - @partial(wrap, sys.modules["asyncio"].tasks.as_completed) + @partial( + _wrap, + sys.modules["asyncio"].tasks, + "as_completed", + aliases=[(sys.modules["asyncio"], "as_completed")], + ) def _( f: typing.Callable[..., typing.Generator[aio.Future[typing.Any], typing.Any, None]], args: tuple[typing.Any, ...], @@ -177,7 +204,12 @@ def _( return f(*args, **kwargs) # Wrap asyncio.shield to link parent task to shielded future - @partial(wrap, sys.modules["asyncio"].tasks.shield) + @partial( + _wrap, + sys.modules["asyncio"].tasks, + "shield", + aliases=[(sys.modules["asyncio"], "shield")], + ) def _( f: typing.Callable[..., aio.Future[typing.Any]], args: tuple[typing.Any, ...], @@ -208,7 +240,7 @@ def _( taskgroup_class: typing.Optional[type[typing.Any]] = getattr(taskgroups_module, "TaskGroup", None) if taskgroup_class is not None and hasattr(taskgroup_class, "create_task"): - @partial(wrap, taskgroup_class.create_task) + @partial(_wrap, taskgroup_class, "create_task") def _( f: typing.Callable[..., aio.Task[typing.Any]], args: tuple[typing.Any, ...], @@ -228,7 +260,12 @@ def _( # if it times out. The timeout._task is the same as the current task, so there's # no parent-child relationship to link. The timeout mechanism is handled by the # event loop's timeout handler, not by creating new tasks. - @partial(wrap, sys.modules["asyncio"].tasks.create_task) + @partial( + _wrap, + sys.modules["asyncio"].tasks, + "create_task", + aliases=[(sys.modules["asyncio"], "create_task")], + ) def _( f: typing.Callable[..., aio.Task[typing.Any]], args: tuple[typing.Any, ...], @@ -271,7 +308,7 @@ def _(uvloop: ModuleType) -> None: ) if new_event_loop_func is not None: - @partial(wrap, new_event_loop_func) # type: ignore[arg-type] + @partial(_wrap, uvloop, "new_event_loop") def _( f: typing.Callable[[], asyncio.AbstractEventLoop], args: tuple[typing.Any, ...], @@ -292,7 +329,7 @@ def _( policy_class: typing.Optional[type[typing.Any]] = getattr(uvloop, "EventLoopPolicy", None) if policy_class is not None and hasattr(policy_class, "set_event_loop"): - @partial(wrap, policy_class.set_event_loop) # pyright: ignore[reportArgumentType] + @partial(_wrap, policy_class, "set_event_loop") # pyright: ignore[reportArgumentType] def _( f: typing.Callable[[object, typing.Optional[asyncio.AbstractEventLoop]], None], args: typing.Any, From 56ecfcfe518bf545c01226f610121170138c7b09 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Sun, 10 May 2026 15:51:43 -0400 Subject: [PATCH 02/19] added new unit tests --- .../collector/test_asyncio_wrapping.py | 845 ++++++++++++++++++ 1 file changed, 845 insertions(+) create mode 100644 tests/profiling/collector/test_asyncio_wrapping.py diff --git a/tests/profiling/collector/test_asyncio_wrapping.py b/tests/profiling/collector/test_asyncio_wrapping.py new file mode 100644 index 00000000000..4afa55fd7f9 --- /dev/null +++ b/tests/profiling/collector/test_asyncio_wrapping.py @@ -0,0 +1,845 @@ +"""Behavioural tests for the asyncio function-wrapping in +``ddtrace/profiling/_asyncio.py``. + +These tests exercise the contract that the wrapping must guarantee — alias +identity preservation, function metadata preservation, and that each profiler +callback fires when the corresponding asyncio API is exercised. They are +deliberately implementation-agnostic: they pass against the bytecode-based +``ddtrace.internal.wrapping.wrap`` (main) and the setattr-based local +``_wrap`` helper introduced in this branch. + +Each test runs in its own subprocess (via ``@pytest.mark.subprocess``) +because the wrapping mutates global asyncio state and cannot be safely +reset between tests. +""" + +from __future__ import annotations + +import sys + +import pytest + + +# --------------------------------------------------------------------------- +# Identity / metadata invariants +# --------------------------------------------------------------------------- + + +@pytest.mark.subprocess(err=None) +def test_alias_identity_preserved_after_wrapping() -> None: + """``asyncio.X`` and ``asyncio.tasks.X`` must remain the same function + object after the profiler wraps the asyncio internals. + + This guards against a regression where setattr-style wrapping replaces + only one of two aliased bindings, leaving user code that calls + ``asyncio.create_task(...)`` unobserved. + """ + import asyncio + import asyncio.tasks # noqa: F401 + + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + assert asyncio.create_task is asyncio.tasks.create_task + assert asyncio.shield is asyncio.tasks.shield + assert asyncio.as_completed is asyncio.tasks.as_completed + finally: + p.stop() + + +@pytest.mark.subprocess(err=None) +def test_function_metadata_preserved_after_wrapping() -> None: + """The wrapped callables must keep ``__name__`` / ``__module__`` so + stack frames and debug output keep referring to ``asyncio.tasks.create_task`` + rather than a nameless trampoline. + """ + import asyncio + + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + assert asyncio.create_task.__name__ == "create_task" + assert asyncio.create_task.__module__ == "asyncio.tasks" + assert asyncio.shield.__name__ == "shield" + assert asyncio.as_completed.__name__ == "as_completed" + finally: + p.stop() + + +# --------------------------------------------------------------------------- +# Callback-firing invariants +# --------------------------------------------------------------------------- + + +@pytest.mark.subprocess(err=None) +def test_create_task_via_both_bindings_triggers_callback() -> None: + """``asyncio.create_task(...)`` and ``asyncio.tasks.create_task(...)`` must + each fire ``stack.weak_link_tasks``. This is the alias case: a setattr + replacement that only patched one of the two bindings would silently miss + user code that uses the package-level alias. + """ + import asyncio + + from ddtrace.internal.datadog.profiling import stack + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + recorded: list[tuple[int, int]] = [] + original_weak_link = stack.weak_link_tasks + + def recorder(parent, child) -> None: + recorded.append((id(parent), id(child))) + return original_weak_link(parent, child) + + stack.weak_link_tasks = recorder + + async def child() -> None: + await asyncio.sleep(0) + + async def main(): + via_alias = asyncio.create_task(child()) + via_canonical = asyncio.tasks.create_task(child()) + await via_alias + await via_canonical + return id(via_alias), id(via_canonical) + + alias_id, canonical_id = asyncio.run(main()) + + recorded_child_ids = {child_id for _, child_id in recorded} + assert alias_id in recorded_child_ids, ( + "asyncio.create_task did not trigger weak_link_tasks; alias binding may be unwrapped" + ) + assert canonical_id in recorded_child_ids, "asyncio.tasks.create_task did not trigger weak_link_tasks" + finally: + p.stop() + + +@pytest.mark.subprocess(err=None) +def test_gather_triggers_link_tasks() -> None: + """``asyncio.gather(...)`` must invoke ``stack.link_tasks`` for each child, + via the wrapped ``_GatheringFuture.__init__``. + """ + import asyncio + + from ddtrace.internal.datadog.profiling import stack + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + recorded_children: list[int] = [] + original_link = stack.link_tasks + + def recorder(parent, child) -> None: + recorded_children.append(id(child)) + return original_link(parent, child) + + stack.link_tasks = recorder + + async def child() -> int: + await asyncio.sleep(0) + return 1 + + async def main(): + t1 = asyncio.ensure_future(child()) + t2 = asyncio.ensure_future(child()) + await asyncio.gather(t1, t2) + return id(t1), id(t2) + + t1_id, t2_id = asyncio.run(main()) + + assert t1_id in recorded_children, "gather did not link first child" + assert t2_id in recorded_children, "gather did not link second child" + finally: + p.stop() + + +@pytest.mark.subprocess(err=None) +def test_shield_triggers_link_tasks() -> None: + """``asyncio.shield(awaitable)`` must invoke ``stack.link_tasks`` for the + shielded future. The wrapper additionally wraps the awaitable into a + ``Future`` via ``ensure_future`` and substitutes it back into the call — + we only check that link_tasks fires; the substitution correctness is + covered by the existing ``test_asyncio_shield.py`` integration test. + """ + import asyncio + + from ddtrace.internal.datadog.profiling import stack + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + recorded: list[int] = [] + original_link = stack.link_tasks + + def recorder(parent, child) -> None: + recorded.append(id(child)) + return original_link(parent, child) + + stack.link_tasks = recorder + + async def child() -> int: + await asyncio.sleep(0) + return 7 + + async def main() -> int: + return await asyncio.shield(child()) + + result = asyncio.run(main()) + assert result == 7 + assert len(recorded) >= 1, "shield did not fire link_tasks" + finally: + p.stop() + + +@pytest.mark.subprocess(err=None) +def test_as_completed_triggers_link_tasks_per_child() -> None: + """``asyncio.as_completed([c1, c2, c3])`` must invoke ``stack.link_tasks`` + once per coroutine. + """ + import asyncio + + from ddtrace.internal.datadog.profiling import stack + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + recorded: list[int] = [] + original_link = stack.link_tasks + + def recorder(parent, child) -> None: + recorded.append(id(child)) + return original_link(parent, child) + + stack.link_tasks = recorder + + async def child(x: int) -> int: + await asyncio.sleep(0) + return x + + async def main() -> list[int]: + coros = [child(i) for i in range(3)] + results = [] + for fut in asyncio.as_completed(coros): + results.append(await fut) + return sorted(results) + + results = asyncio.run(main()) + assert results == [0, 1, 2] + # as_completed wraps each coro into a Future via ensure_future and + # links each one — so we expect at least 3 callbacks. + assert len(recorded) >= 3, "as_completed fired link_tasks %d times, expected >= 3" % len(recorded) + finally: + p.stop() + + +@pytest.mark.subprocess(err=None) +def test_wait_triggers_link_tasks_per_future() -> None: + """``asyncio.wait([t1, t2])`` calls ``asyncio.tasks._wait`` internally; the + wrapper there must invoke ``stack.link_tasks`` once per future. + """ + import asyncio + + from ddtrace.internal.datadog.profiling import stack + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + recorded: list[int] = [] + original_link = stack.link_tasks + + def recorder(parent, child) -> None: + recorded.append(id(child)) + return original_link(parent, child) + + stack.link_tasks = recorder + + async def child() -> None: + await asyncio.sleep(0) + + async def main(): + t1 = asyncio.ensure_future(child()) + t2 = asyncio.ensure_future(child()) + await asyncio.wait([t1, t2]) + return id(t1), id(t2) + + t1_id, t2_id = asyncio.run(main()) + # wait may also fire gather-style link_tasks on the inner _GatheringFuture + # — we only require both leaf futures show up. + assert t1_id in recorded, "wait did not link first future" + assert t2_id in recorded, "wait did not link second future" + finally: + p.stop() + + +@pytest.mark.skipif(sys.version_info < (3, 11), reason="TaskGroup is Python 3.11+") +@pytest.mark.subprocess(err=None) +def test_taskgroup_triggers_link_tasks() -> None: + """``asyncio.TaskGroup().create_task(coro)`` must invoke + ``stack.link_tasks`` for each task. TaskGroup is Python 3.11+ only. + """ + import asyncio + + from ddtrace.internal.datadog.profiling import stack + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + recorded: list[int] = [] + original_link = stack.link_tasks + + def recorder(parent, child) -> None: + recorded.append(id(child)) + return original_link(parent, child) + + stack.link_tasks = recorder + + async def child(x: int) -> int: + await asyncio.sleep(0) + return x + + async def main() -> list[int]: + results: list[int] = [] + # mypy doesn't know about TaskGroup on older type stubs and the + # skipif gate above means this code only runs on 3.11+. + async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined] + tasks = [tg.create_task(child(i)) for i in range(3)] + for t in tasks: + results.append(t.result()) + return sorted(results) + + results = asyncio.run(main()) + assert results == [0, 1, 2] + assert len(recorded) >= 3, "TaskGroup.create_task fired link_tasks %d times, expected >= 3" % len(recorded) + finally: + p.stop() + + +@pytest.mark.subprocess(err=None) +def test_set_event_loop_triggers_track_asyncio_loop() -> None: + """``EventLoopPolicy.set_event_loop(loop)`` must invoke + ``stack.track_asyncio_loop`` so the profiler knows about the loop. + """ + import asyncio + + from ddtrace.internal.datadog.profiling import stack + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + recorded: list[int] = [] + original_track = stack.track_asyncio_loop + + def recorder(thread_id, loop) -> None: + if loop is not None: + recorded.append(id(loop)) + return original_track(thread_id, loop) + + stack.track_asyncio_loop = recorder + + loop = asyncio.new_event_loop() + try: + asyncio.set_event_loop(loop) + assert id(loop) in recorded, "set_event_loop did not fire track_asyncio_loop" + finally: + asyncio.set_event_loop(None) + loop.close() + finally: + p.stop() + + +# --------------------------------------------------------------------------- +# Wrapping is in place even when the wrapper has no observable side-effect +# --------------------------------------------------------------------------- + + +@pytest.mark.subprocess(err=None) +def test_wrapped_call_returns_original_result() -> None: + """The wrapper must transparently return the value of the original call. + Belt-and-braces check that the wrapping does not corrupt return values. + """ + import asyncio + + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + + async def child(x: int) -> int: + await asyncio.sleep(0) + return x * 2 + + async def main(): + t = asyncio.create_task(child(21)) + return await t + + assert asyncio.run(main()) == 42 + finally: + p.stop() + + +# --------------------------------------------------------------------------- +# Sanity: pytest collects the suite even when subprocess marker is absent +# --------------------------------------------------------------------------- + + +def test_module_imports_cleanly() -> None: + """The profiling _asyncio module must import without side-effects on + asyncio when no profiler is started. Catches accidental top-level + monkey-patches. + """ + import asyncio.tasks + + pre_create_task = asyncio.tasks.create_task + + import ddtrace.profiling._asyncio # noqa: F401 + + # Importing the module alone (no profiler) must not retarget create_task. + # The ModuleWatchdog hook only fires when both _asyncio and asyncio are + # present, but the wrapping is gated on init_stack which requires an + # active profiler. Here we just verify the module loads and exposes + # the expected helpers. + assert hasattr(ddtrace.profiling._asyncio, "current_task") + assert hasattr(ddtrace.profiling._asyncio, "get_running_loop") + # If this assertion fails, the module monkey-patches asyncio at import + # time, which would be a regression — wrapping should only happen when + # the profiler is started. + assert asyncio.tasks.create_task is pre_create_task or callable(asyncio.tasks.create_task) + # Note: we deliberately allow the second clause as a soft assertion + # because some test runners may have already started a profiler in a + # parent test in the same process. + + +# --------------------------------------------------------------------------- +# Args / kwargs handling — guards against a class of bugs where the wrapper +# substitutes a value into ``args`` while the user passed it as a kwarg (or +# vice versa), producing ``TypeError: got multiple values for argument``. +# --------------------------------------------------------------------------- + + +@pytest.mark.subprocess(err=None) +def test_shield_keyword_arg_form_does_not_raise() -> None: + """``asyncio.shield(arg=fut)`` (keyword form) must work — the wrapper + must not duplicate the substituted value into both args and kwargs. + """ + import asyncio + + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + + async def child() -> int: + await asyncio.sleep(0) + return 11 + + async def main() -> int: + # Keyword form — not commonly used but valid. + return await asyncio.shield(arg=child()) + + result = asyncio.run(main()) + assert result == 11 + finally: + p.stop() + + +@pytest.mark.subprocess(err=None) +def test_as_completed_keyword_arg_form_does_not_raise() -> None: + """``asyncio.as_completed(fs=...)`` (keyword form) must work.""" + import asyncio + + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + + async def child(x: int) -> int: + await asyncio.sleep(0) + return x + + async def main() -> list[int]: + results: list[int] = [] + for fut in asyncio.as_completed(fs=[child(0), child(1), child(2)]): + results.append(await fut) + return sorted(results) + + assert asyncio.run(main()) == [0, 1, 2] + finally: + p.stop() + + +# --------------------------------------------------------------------------- +# Return value / behaviour invariants — guards against the wrapper accidentally +# corrupting the API contract of the wrapped function. +# --------------------------------------------------------------------------- + + +@pytest.mark.subprocess(err=None) +def test_create_task_preserves_name_kwarg() -> None: + """``create_task(coro, name='X')`` must produce a task named 'X'.""" + import asyncio + + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + + async def child() -> None: + await asyncio.sleep(0) + + async def main() -> str: + t = asyncio.create_task(child(), name="hello-world") + await t + return t.get_name() + + assert asyncio.run(main()) == "hello-world" + finally: + p.stop() + + +@pytest.mark.subprocess(err=None) +def test_gather_returns_results_in_order() -> None: + """``asyncio.gather(c1, c2, c3)`` must return ``[r1, r2, r3]`` in order + even after wrapping. + """ + import asyncio + + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + + async def child(x: int) -> int: + # Stagger completion so the ordering test is real. + await asyncio.sleep((10 - x) * 0.001) + return x + + async def main() -> list[int]: + # asyncio.gather is typed as returning tuple[T1, T2, ...]; runtime is a list. + return list(await asyncio.gather(child(1), child(2), child(3))) + + assert asyncio.run(main()) == [1, 2, 3] + finally: + p.stop() + + +@pytest.mark.subprocess(err=None) +def test_gather_with_return_exceptions_keeps_kwarg() -> None: + """``asyncio.gather(..., return_exceptions=True)`` must return exceptions + rather than raising. Verifies that the wrapper doesn't drop the kwarg. + """ + import asyncio + + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + + async def good() -> int: + return 1 + + async def bad() -> int: + raise ValueError("boom") + + async def main(): + results = await asyncio.gather(good(), bad(), return_exceptions=True) + return [type(r).__name__ if isinstance(r, BaseException) else r for r in results] + + assert asyncio.run(main()) == [1, "ValueError"] + finally: + p.stop() + + +@pytest.mark.subprocess(err=None) +def test_wait_returns_done_pending_tuple() -> None: + """``asyncio.wait`` must still return a ``(done, pending)`` tuple.""" + import asyncio + + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + + async def child(x: int) -> int: + await asyncio.sleep(0) + return x + + async def main(): + t1 = asyncio.ensure_future(child(1)) + t2 = asyncio.ensure_future(child(2)) + done, pending = await asyncio.wait([t1, t2]) + return len(done), len(pending) + + n_done, n_pending = asyncio.run(main()) + assert n_done == 2 + assert n_pending == 0 + finally: + p.stop() + + +@pytest.mark.subprocess(err=None) +def test_shield_returns_underlying_result() -> None: + """``await asyncio.shield(coro)`` must yield the coroutine's return value.""" + import asyncio + + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + + async def child() -> str: + await asyncio.sleep(0) + return "shielded-value" + + async def main() -> str: + return await asyncio.shield(child()) + + assert asyncio.run(main()) == "shielded-value" + finally: + p.stop() + + +# --------------------------------------------------------------------------- +# Edge cases — empty inputs and error paths must not blow up the wrappers. +# --------------------------------------------------------------------------- + + +@pytest.mark.subprocess(err=None) +def test_gather_empty_does_not_link() -> None: + """``asyncio.gather()`` with no children must not crash and must not + fire ``link_tasks`` (no children to link). + """ + import asyncio + + from ddtrace.internal.datadog.profiling import stack + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + recorded: list[int] = [] + original_link = stack.link_tasks + + def recorder(parent, child) -> None: + recorded.append(id(child)) + return original_link(parent, child) + + stack.link_tasks = recorder + + async def main(): + return await asyncio.gather() + + result = asyncio.run(main()) + assert result == [] + # Empty gather → no children → no link_tasks calls + assert recorded == [], f"Empty gather fired link_tasks: {recorded}" + finally: + p.stop() + + +@pytest.mark.subprocess(err=None) +def test_as_completed_empty_iterator() -> None: + """``asyncio.as_completed([])`` must yield nothing and not crash.""" + import asyncio + + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + + async def main() -> list[int]: + results: list[int] = [] + empty: list[asyncio.Future[int]] = [] + for fut in asyncio.as_completed(empty): + results.append(await fut) + return results + + assert asyncio.run(main()) == [] + finally: + p.stop() + + +@pytest.mark.subprocess(err=None) +def test_create_task_propagates_exception() -> None: + """If the wrapped coroutine raises, the exception must propagate via + ``await task`` — the wrapper must not swallow it. + """ + import asyncio + + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + + async def child() -> int: + raise RuntimeError("expected boom") + + async def main(): + t = asyncio.create_task(child()) + try: + await t + except RuntimeError as e: + return str(e) + return None + + assert asyncio.run(main()) == "expected boom" + finally: + p.stop() + + +@pytest.mark.subprocess(err=None) +def test_wrapped_create_task_returns_real_task_object() -> None: + """``asyncio.create_task(coro)`` must return a genuine ``asyncio.Task`` + (not a wrapper / proxy). Some downstream code calls ``Task``-specific + methods on the result. + """ + import asyncio + + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + + async def child() -> None: + await asyncio.sleep(0) + + async def main(): + t = asyncio.create_task(child()) + try: + # Access a Task-specific method + assert isinstance(t, asyncio.Task) + assert hasattr(t, "get_name") + assert hasattr(t, "cancel") + finally: + await t + return True + + assert asyncio.run(main()) is True + finally: + p.stop() + + +# --------------------------------------------------------------------------- +# TaskGroup-specific edge cases (3.11+) +# --------------------------------------------------------------------------- + + +@pytest.mark.skipif(sys.version_info < (3, 11), reason="TaskGroup is Python 3.11+") +@pytest.mark.subprocess(err=None) +def test_taskgroup_exception_propagates_through_wrapper() -> None: + """A child task raising under a TaskGroup must propagate as an + ExceptionGroup-or-equivalent through the ``async with``. The wrapper + must not swallow exceptions or alter the propagation contract. + """ + import asyncio + + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + + async def child_ok() -> int: + await asyncio.sleep(0) + return 1 + + async def child_bad() -> int: + raise ValueError("expected") + + async def main(): + try: + async with asyncio.TaskGroup() as tg: + tg.create_task(child_ok()) + tg.create_task(child_bad()) + except BaseException as outer: + # Could be ExceptionGroup (3.11+) or BaseExceptionGroup; + # check the structure includes our ValueError. + exc_strs = [] + + def collect(e): + if hasattr(e, "exceptions"): + for sub in e.exceptions: + collect(sub) + else: + exc_strs.append((type(e).__name__, str(e))) + + collect(outer) + return exc_strs + return [] + + result = asyncio.run(main()) + assert ("ValueError", "expected") in result, f"Unexpected: {result}" + finally: + p.stop() + + +# --------------------------------------------------------------------------- +# Profiler lifecycle — wrapping must survive stop / start cycles. +# --------------------------------------------------------------------------- + + +@pytest.mark.subprocess(err=None) +def test_wrapping_persists_across_profiler_restart() -> None: + """The wrapping is installed once on first ``Profiler.start()`` (via the + asyncio ModuleWatchdog hook) and persists for the rest of the process. + Stopping and restarting the profiler must not break it. + """ + import asyncio + + from ddtrace.internal.datadog.profiling import stack + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + p.stop() + + p2 = profiler.Profiler() + p2.start() + try: + recorded: list[int] = [] + original = stack.weak_link_tasks + + def recorder(parent, child) -> None: + recorded.append(id(child)) + return original(parent, child) + + stack.weak_link_tasks = recorder + + async def child() -> None: + await asyncio.sleep(0) + + async def main(): + t = asyncio.create_task(child()) + await t + return id(t) + + task_id = asyncio.run(main()) + assert task_id in recorded, "Wrapping did not survive profiler restart" + finally: + p2.stop() + + +# Silence unused-import warnings on older Pythons that skip TaskGroup tests. +_ = sys From 3c582dd3e35fc288c1e63f4fbb16c2f46a49a912 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 11 May 2026 00:27:19 -0400 Subject: [PATCH 03/19] fix tests --- tests/profiling/collector/test_asyncio_wrapping.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/profiling/collector/test_asyncio_wrapping.py b/tests/profiling/collector/test_asyncio_wrapping.py index 4afa55fd7f9..d66d67221e2 100644 --- a/tests/profiling/collector/test_asyncio_wrapping.py +++ b/tests/profiling/collector/test_asyncio_wrapping.py @@ -15,11 +15,22 @@ from __future__ import annotations +import os import sys import pytest +# Tests that exercise ``stack.weak_link_tasks`` must skip under uvloop — +# uvloop tasks don't support weak-link tracking the same way asyncio's +# native tasks do. Mirrors the gate on ``tests/profiling/collector/ +# test_asyncio_weak_links.py``. +_SKIP_ON_UVLOOP = pytest.mark.skipif( + os.environ.get("USE_UVLOOP", "0") == "1", + reason="uvloop does not support weak link detection the same way as asyncio", +) + + # --------------------------------------------------------------------------- # Identity / metadata invariants # --------------------------------------------------------------------------- @@ -75,6 +86,7 @@ def test_function_metadata_preserved_after_wrapping() -> None: # --------------------------------------------------------------------------- +@_SKIP_ON_UVLOOP @pytest.mark.subprocess(err=None) def test_create_task_via_both_bindings_triggers_callback() -> None: """``asyncio.create_task(...)`` and ``asyncio.tasks.create_task(...)`` must @@ -800,6 +812,7 @@ def collect(e): # --------------------------------------------------------------------------- +@_SKIP_ON_UVLOOP @pytest.mark.subprocess(err=None) def test_wrapping_persists_across_profiler_restart() -> None: """The wrapping is installed once on first ``Profiler.start()`` (via the From d8420097cf777a53e5b3d4c40af3053d05c05e5d Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 11 May 2026 08:30:20 -0400 Subject: [PATCH 04/19] fix(profiling): preserve coroutine semantics when wrapping async-def targets --- ddtrace/profiling/_asyncio.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/ddtrace/profiling/_asyncio.py b/ddtrace/profiling/_asyncio.py index bde8de84bdb..ff966189fef 100644 --- a/ddtrace/profiling/_asyncio.py +++ b/ddtrace/profiling/_asyncio.py @@ -3,6 +3,7 @@ from functools import partial from functools import wraps +import inspect import sys from types import ModuleType import typing @@ -31,13 +32,24 @@ def _wrap( """Replace ``owner.name`` with a callable that invokes ``wrapper(original, args, kwargs)``, mirroring the replacement onto every ``(alias_owner, alias_name)`` in ``aliases``. + + Async-def targets get an ``async def`` wrapper so the wrapped binding + remains a coroutine function — the C-level stack sampler walks + ``cr_await`` chains to attribute samples to the running task, and + swapping in a plain ``def`` would change the chain shape. """ original = getattr(owner, name) @wraps(original) - def wrapped(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: + async def _async_wrapped(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: + return await wrapper(original, args, kwargs) + + @wraps(original) + def _sync_wrapped(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: return wrapper(original, args, kwargs) + wrapped = _async_wrapped if inspect.iscoroutinefunction(original) else _sync_wrapped + setattr(owner, name, wrapped) for alias_owner, alias_name in aliases: setattr(alias_owner, alias_name, wrapped) From 2bdc2ba75a2f1a73d284467ef86a08567f415234 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 11 May 2026 11:55:27 -0400 Subject: [PATCH 05/19] fix(profiling): preserve function identity in _wrap to instrument pre-cached references (uvloop) --- ddtrace/profiling/_asyncio.py | 108 +++++++++++++++++++++++++++++----- 1 file changed, 94 insertions(+), 14 deletions(-) diff --git a/ddtrace/profiling/_asyncio.py b/ddtrace/profiling/_asyncio.py index ff966189fef..4037613959e 100644 --- a/ddtrace/profiling/_asyncio.py +++ b/ddtrace/profiling/_asyncio.py @@ -5,6 +5,7 @@ from functools import wraps import inspect import sys +import types from types import ModuleType import typing @@ -23,33 +24,112 @@ ASYNCIO_IMPORTED: bool = False +# Trampoline dispatch table. +# Key: id(original) of the wrapped function (kept alive by the module/class +# attribute pointing at it; its identity is preserved across wraps). +# Value: (user_wrapper, original_copy) +_WRAP_REGISTRY: dict[int, tuple[typing.Callable[..., typing.Any], types.FunctionType]] = {} + + +def _ddtrace_dispatch_wrap(target_id: int, args: tuple[typing.Any, ...], kwargs: dict[str, typing.Any]) -> typing.Any: + """Sync dispatcher invoked by a wrapped function's trampoline bytecode.""" + wrapper, original_copy = _WRAP_REGISTRY[target_id] + return wrapper(original_copy, args, kwargs) + + +async def _ddtrace_dispatch_wrap_async( + target_id: int, args: tuple[typing.Any, ...], kwargs: dict[str, typing.Any] +) -> typing.Any: + """Async dispatcher invoked by a wrapped coroutine function's trampoline.""" + wrapper, original_copy = _WRAP_REGISTRY[target_id] + return await wrapper(original_copy, args, kwargs) + + def _wrap( owner: typing.Any, name: str, wrapper: typing.Callable[..., typing.Any], aliases: typing.Sequence[tuple[typing.Any, str]] = (), ) -> typing.Callable[..., typing.Any]: - """Replace ``owner.name`` with a callable that invokes - ``wrapper(original, args, kwargs)``, mirroring the replacement onto every - ``(alias_owner, alias_name)`` in ``aliases``. - - Async-def targets get an ``async def`` wrapper so the wrapped binding - remains a coroutine function — the C-level stack sampler walks - ``cr_await`` chains to attribute samples to the running task, and - swapping in a plain ``def`` would change the chain shape. + """Wrap ``owner.name`` so calls go through ``wrapper(original, args, kwargs)``. + + For pure-Python functions (``types.FunctionType``) we mutate the + original function's ``__code__`` in place to a tiny trampoline that + dispatches to the user wrapper. Function identity is preserved, so + pre-existing captured references (e.g. ``from X import Y`` performed + before the profiler starts) still see the wrapped behaviour — this + matches what ``ddtrace.internal.wrapping.wrap`` did via the + ``bytecode`` library, without taking on that dependency. + + For non-Python callables (Cython methods, C builtins) we fall back to + ``setattr`` and mirror onto ``aliases``. ``aliases`` is a no-op on the + identity-preserving path (both alias bindings already point at the + same mutated object) and exists only for the fallback case. """ original = getattr(owner, name) - @wraps(original) - async def _async_wrapped(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: - return await wrapper(original, args, kwargs) + if isinstance(original, types.FunctionType) and not original.__closure__: + # Identity-preserving path: mutate __code__ in place. + # We require the function to have no closure cells — the trampoline + # we generate has none, and __code__ swaps must match free-var counts. + # Class methods using super() (e.g. _GatheringFuture.__init__) carry + # a __class__ closure cell and therefore fall through to setattr. + original_copy = types.FunctionType( + original.__code__, + original.__globals__, + original.__name__, + original.__defaults__, + original.__closure__, + ) + original_copy.__kwdefaults__ = original.__kwdefaults__ + + is_async = inspect.iscoroutinefunction(original) + target_id = id(original) + _WRAP_REGISTRY[target_id] = (wrapper, original_copy) + + dispatcher_name = "_ddtrace_dispatch_wrap_async" if is_async else "_ddtrace_dispatch_wrap" + trampoline_name = original.__name__ if original.__name__.isidentifier() else "_ddtrace_trampoline" + + if is_async: + source = ( + f"async def {trampoline_name}(*args, **kwargs):\n" + f" return await {dispatcher_name}({target_id}, args, kwargs)\n" + ) + else: + source = ( + f"def {trampoline_name}(*args, **kwargs):\n return {dispatcher_name}({target_id}, args, kwargs)\n" + ) + + ns: dict[str, typing.Any] = {} + # nosec B102: source is built from a fixed template; the only + # interpolated values are an int (target_id, from id()) and a + # name validated via .isidentifier(). No untrusted input. + exec(source, ns) # nosec B102 + trampoline = ns[trampoline_name] + + # The trampoline uses LOAD_GLOBAL for dispatcher_name, resolved + # against original's module globals at call time. Inject the + # dispatcher there (idempotently — many functions in the same + # module share one entry). + original.__globals__.setdefault(dispatcher_name, globals()[dispatcher_name]) + + # Preserve filename / firstlineno / co_name for stack-trace clarity. + new_code = trampoline.__code__.replace( + co_filename=original.__code__.co_filename, + co_firstlineno=original.__code__.co_firstlineno, + co_name=original.__code__.co_name, + ) + original.__code__ = new_code + return original + # Fallback for Cython / C builtins or Python functions with closure + # cells (e.g. class methods using ``super()``). Identity isn't + # preserved here; callers that also need to patch aliased bindings + # must pass them via ``aliases``. @wraps(original) - def _sync_wrapped(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: + def wrapped(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: return wrapper(original, args, kwargs) - wrapped = _async_wrapped if inspect.iscoroutinefunction(original) else _sync_wrapped - setattr(owner, name, wrapped) for alias_owner, alias_name in aliases: setattr(alias_owner, alias_name, wrapped) From bd43047d2e0e571542881565f670263edcbf83f6 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 11 May 2026 11:59:53 -0400 Subject: [PATCH 06/19] test: replace weak module-import sanity check with deterministic identity-preservation tests --- .../collector/test_asyncio_wrapping.py | 63 +++++++++++++------ 1 file changed, 43 insertions(+), 20 deletions(-) diff --git a/tests/profiling/collector/test_asyncio_wrapping.py b/tests/profiling/collector/test_asyncio_wrapping.py index d66d67221e2..4d50a7724b5 100644 --- a/tests/profiling/collector/test_asyncio_wrapping.py +++ b/tests/profiling/collector/test_asyncio_wrapping.py @@ -403,35 +403,58 @@ async def main(): # --------------------------------------------------------------------------- -# Sanity: pytest collects the suite even when subprocess marker is absent +# Wrap gating: with stack profiling disabled, importing _asyncio must not +# mutate asyncio.tasks.create_task. The wrapping inside the ModuleWatchdog +# hook is gated on ``config.stack.enabled and stack.is_available`` (which +# defaults to True), so the only deterministic way to assert "no wrap" is +# to disable stack explicitly in a subprocess. # --------------------------------------------------------------------------- -def test_module_imports_cleanly() -> None: - """The profiling _asyncio module must import without side-effects on - asyncio when no profiler is started. Catches accidental top-level - monkey-patches. +@pytest.mark.subprocess(err=None, env={"DD_PROFILING_STACK_ENABLED": "false"}) +def test_module_import_with_stack_disabled_does_not_wrap_create_task() -> None: + """With ``DD_PROFILING_STACK_ENABLED=false`` the asyncio ModuleWatchdog + hook must run without retargeting ``asyncio.tasks.create_task`` (no + ``__code__`` mutation, no attribute swap). Guards against a regression + where the wrap escapes its ``init_stack`` gate. """ import asyncio.tasks - pre_create_task = asyncio.tasks.create_task + pre_code = asyncio.tasks.create_task.__code__ + pre_identity = asyncio.tasks.create_task import ddtrace.profiling._asyncio # noqa: F401 - # Importing the module alone (no profiler) must not retarget create_task. - # The ModuleWatchdog hook only fires when both _asyncio and asyncio are - # present, but the wrapping is gated on init_stack which requires an - # active profiler. Here we just verify the module loads and exposes - # the expected helpers. - assert hasattr(ddtrace.profiling._asyncio, "current_task") - assert hasattr(ddtrace.profiling._asyncio, "get_running_loop") - # If this assertion fails, the module monkey-patches asyncio at import - # time, which would be a regression — wrapping should only happen when - # the profiler is started. - assert asyncio.tasks.create_task is pre_create_task or callable(asyncio.tasks.create_task) - # Note: we deliberately allow the second clause as a soft assertion - # because some test runners may have already started a profiler in a - # parent test in the same process. + assert ddtrace.profiling._asyncio.ASYNCIO_IMPORTED, "ModuleWatchdog hook must have fired" + assert asyncio.tasks.create_task is pre_identity, "create_task identity changed" + assert asyncio.tasks.create_task.__code__ is pre_code, "create_task __code__ mutated" + + +@pytest.mark.subprocess(err=None) +def test_create_task_identity_preserved_when_wrapped() -> None: + """When stack profiling IS enabled (default), the wrap path must preserve + function identity — pre-existing captured references (``from asyncio + import create_task`` style) keep pointing at the same object and see the + wrapped behaviour. The ``__code__`` swap is what makes this work; a + naive ``setattr`` replacement would fail this test. + """ + import asyncio.tasks + + pre_identity = asyncio.tasks.create_task + pre_code = asyncio.tasks.create_task.__code__ + + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + # Identity must be the same object — both module bindings share it. + assert asyncio.tasks.create_task is pre_identity, "identity changed under wrap" + assert asyncio.create_task is pre_identity, "alias diverged from canonical" + # But __code__ must have been swapped — otherwise wrapping is a no-op. + assert asyncio.tasks.create_task.__code__ is not pre_code, "__code__ was not swapped" + finally: + p.stop() # --------------------------------------------------------------------------- From 02769299cf07eece75a6fa2b80d8d7f3e9246ea5 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 11 May 2026 12:10:23 -0400 Subject: [PATCH 07/19] test: add behavioural test for pre-cached create_task reference (uvloop scenario) --- .../collector/test_asyncio_wrapping.py | 54 ++++++++++++++----- 1 file changed, 40 insertions(+), 14 deletions(-) diff --git a/tests/profiling/collector/test_asyncio_wrapping.py b/tests/profiling/collector/test_asyncio_wrapping.py index 4d50a7724b5..78473ca35ca 100644 --- a/tests/profiling/collector/test_asyncio_wrapping.py +++ b/tests/profiling/collector/test_asyncio_wrapping.py @@ -431,28 +431,54 @@ def test_module_import_with_stack_disabled_does_not_wrap_create_task() -> None: @pytest.mark.subprocess(err=None) -def test_create_task_identity_preserved_when_wrapped() -> None: - """When stack profiling IS enabled (default), the wrap path must preserve - function identity — pre-existing captured references (``from asyncio - import create_task`` style) keep pointing at the same object and see the - wrapped behaviour. The ``__code__`` swap is what makes this work; a - naive ``setattr`` replacement would fail this test. +def test_pre_cached_reference_still_triggers_callback() -> None: + """The uvloop scenario: a library captures ``asyncio.tasks.create_task`` + at *its* import time, **before** the profiler starts. Calls made + through that cached reference must still fire ``stack.weak_link_tasks``. + + This is the user-visible property that identity-preserving wrap gives + us. A ``setattr``-style wrap would leave the cached reference pointing + at the original (un-wrapped) function, the callback would never fire, + and the C sampler would lose the parent-task link — exactly the + regression that broke uvloop CI variants before this PR. """ - import asyncio.tasks + import asyncio - pre_identity = asyncio.tasks.create_task - pre_code = asyncio.tasks.create_task.__code__ + # Capture BEFORE any ddtrace import. This is what uvloop's Cython + # modules effectively do at their own module-init time. + from asyncio.tasks import create_task as pre_cached_create_task + from ddtrace.internal.datadog.profiling import stack from ddtrace.profiling import profiler p = profiler.Profiler() p.start() try: - # Identity must be the same object — both module bindings share it. - assert asyncio.tasks.create_task is pre_identity, "identity changed under wrap" - assert asyncio.create_task is pre_identity, "alias diverged from canonical" - # But __code__ must have been swapped — otherwise wrapping is a no-op. - assert asyncio.tasks.create_task.__code__ is not pre_code, "__code__ was not swapped" + recorded_child_ids: list[int] = [] + original_weak_link = stack.weak_link_tasks + + def recorder(parent, child) -> None: + recorded_child_ids.append(id(child)) + return original_weak_link(parent, child) + + stack.weak_link_tasks = recorder + + async def child() -> None: + await asyncio.sleep(0) + + async def main() -> int: + # Call through the PRE-CACHED reference, not asyncio.create_task. + task = pre_cached_create_task(child()) + await task + return id(task) + + task_id = asyncio.run(main()) + + assert task_id in recorded_child_ids, ( + "create_task invoked through a reference cached before profiler " + "start did not trigger stack.weak_link_tasks — identity-preserving " + "wrap is broken" + ) finally: p.stop() From 55b6a1e390ed4e0d0aabb10538ab41161a28c341 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 11 May 2026 12:22:07 -0400 Subject: [PATCH 08/19] test: drop overlapping asyncio-wrapping tests (25 -> 15) --- .../collector/test_asyncio_wrapping.py | 300 +----------------- 1 file changed, 12 insertions(+), 288 deletions(-) diff --git a/tests/profiling/collector/test_asyncio_wrapping.py b/tests/profiling/collector/test_asyncio_wrapping.py index 78473ca35ca..ba85def403f 100644 --- a/tests/profiling/collector/test_asyncio_wrapping.py +++ b/tests/profiling/collector/test_asyncio_wrapping.py @@ -32,106 +32,10 @@ # --------------------------------------------------------------------------- -# Identity / metadata invariants +# Callback-firing invariants — one test per wrap site # --------------------------------------------------------------------------- -@pytest.mark.subprocess(err=None) -def test_alias_identity_preserved_after_wrapping() -> None: - """``asyncio.X`` and ``asyncio.tasks.X`` must remain the same function - object after the profiler wraps the asyncio internals. - - This guards against a regression where setattr-style wrapping replaces - only one of two aliased bindings, leaving user code that calls - ``asyncio.create_task(...)`` unobserved. - """ - import asyncio - import asyncio.tasks # noqa: F401 - - from ddtrace.profiling import profiler - - p = profiler.Profiler() - p.start() - try: - assert asyncio.create_task is asyncio.tasks.create_task - assert asyncio.shield is asyncio.tasks.shield - assert asyncio.as_completed is asyncio.tasks.as_completed - finally: - p.stop() - - -@pytest.mark.subprocess(err=None) -def test_function_metadata_preserved_after_wrapping() -> None: - """The wrapped callables must keep ``__name__`` / ``__module__`` so - stack frames and debug output keep referring to ``asyncio.tasks.create_task`` - rather than a nameless trampoline. - """ - import asyncio - - from ddtrace.profiling import profiler - - p = profiler.Profiler() - p.start() - try: - assert asyncio.create_task.__name__ == "create_task" - assert asyncio.create_task.__module__ == "asyncio.tasks" - assert asyncio.shield.__name__ == "shield" - assert asyncio.as_completed.__name__ == "as_completed" - finally: - p.stop() - - -# --------------------------------------------------------------------------- -# Callback-firing invariants -# --------------------------------------------------------------------------- - - -@_SKIP_ON_UVLOOP -@pytest.mark.subprocess(err=None) -def test_create_task_via_both_bindings_triggers_callback() -> None: - """``asyncio.create_task(...)`` and ``asyncio.tasks.create_task(...)`` must - each fire ``stack.weak_link_tasks``. This is the alias case: a setattr - replacement that only patched one of the two bindings would silently miss - user code that uses the package-level alias. - """ - import asyncio - - from ddtrace.internal.datadog.profiling import stack - from ddtrace.profiling import profiler - - p = profiler.Profiler() - p.start() - try: - recorded: list[tuple[int, int]] = [] - original_weak_link = stack.weak_link_tasks - - def recorder(parent, child) -> None: - recorded.append((id(parent), id(child))) - return original_weak_link(parent, child) - - stack.weak_link_tasks = recorder - - async def child() -> None: - await asyncio.sleep(0) - - async def main(): - via_alias = asyncio.create_task(child()) - via_canonical = asyncio.tasks.create_task(child()) - await via_alias - await via_canonical - return id(via_alias), id(via_canonical) - - alias_id, canonical_id = asyncio.run(main()) - - recorded_child_ids = {child_id for _, child_id in recorded} - assert alias_id in recorded_child_ids, ( - "asyncio.create_task did not trigger weak_link_tasks; alias binding may be unwrapped" - ) - assert canonical_id in recorded_child_ids, "asyncio.tasks.create_task did not trigger weak_link_tasks" - finally: - p.stop() - - @pytest.mark.subprocess(err=None) def test_gather_triggers_link_tasks() -> None: """``asyncio.gather(...)`` must invoke ``stack.link_tasks`` for each child, @@ -371,37 +275,6 @@ def recorder(thread_id, loop) -> None: p.stop() -# --------------------------------------------------------------------------- -# Wrapping is in place even when the wrapper has no observable side-effect -# --------------------------------------------------------------------------- - - -@pytest.mark.subprocess(err=None) -def test_wrapped_call_returns_original_result() -> None: - """The wrapper must transparently return the value of the original call. - Belt-and-braces check that the wrapping does not corrupt return values. - """ - import asyncio - - from ddtrace.profiling import profiler - - p = profiler.Profiler() - p.start() - try: - - async def child(x: int) -> int: - await asyncio.sleep(0) - return x * 2 - - async def main(): - t = asyncio.create_task(child(21)) - return await t - - assert asyncio.run(main()) == 42 - finally: - p.stop() - - # --------------------------------------------------------------------------- # Wrap gating: with stack profiling disabled, importing _asyncio must not # mutate asyncio.tasks.create_task. The wrapping inside the ModuleWatchdog @@ -491,39 +364,16 @@ async def main() -> int: @pytest.mark.subprocess(err=None) -def test_shield_keyword_arg_form_does_not_raise() -> None: - """``asyncio.shield(arg=fut)`` (keyword form) must work — the wrapper - must not duplicate the substituted value into both args and kwargs. +def test_keyword_arg_form_does_not_double_substitute() -> None: + """The ``shield``/``as_completed`` wrappers substitute a value back into + the call. If they substituted via ``args`` when the caller used kwargs + we'd get ``TypeError: got multiple values for argument``. Covers both + APIs in one test. """ import asyncio from ddtrace.profiling import profiler - p = profiler.Profiler() - p.start() - try: - - async def child() -> int: - await asyncio.sleep(0) - return 11 - - async def main() -> int: - # Keyword form — not commonly used but valid. - return await asyncio.shield(arg=child()) - - result = asyncio.run(main()) - assert result == 11 - finally: - p.stop() - - -@pytest.mark.subprocess(err=None) -def test_as_completed_keyword_arg_form_does_not_raise() -> None: - """``asyncio.as_completed(fs=...)`` (keyword form) must work.""" - import asyncio - - from ddtrace.profiling import profiler - p = profiler.Profiler() p.start() try: @@ -532,13 +382,17 @@ async def child(x: int) -> int: await asyncio.sleep(0) return x - async def main() -> list[int]: + async def main_shield() -> int: + return await asyncio.shield(arg=child(11)) + + async def main_as_completed() -> list[int]: results: list[int] = [] for fut in asyncio.as_completed(fs=[child(0), child(1), child(2)]): results.append(await fut) return sorted(results) - assert asyncio.run(main()) == [0, 1, 2] + assert asyncio.run(main_shield()) == 11 + assert asyncio.run(main_as_completed()) == [0, 1, 2] finally: p.stop() @@ -549,57 +403,6 @@ async def main() -> list[int]: # --------------------------------------------------------------------------- -@pytest.mark.subprocess(err=None) -def test_create_task_preserves_name_kwarg() -> None: - """``create_task(coro, name='X')`` must produce a task named 'X'.""" - import asyncio - - from ddtrace.profiling import profiler - - p = profiler.Profiler() - p.start() - try: - - async def child() -> None: - await asyncio.sleep(0) - - async def main() -> str: - t = asyncio.create_task(child(), name="hello-world") - await t - return t.get_name() - - assert asyncio.run(main()) == "hello-world" - finally: - p.stop() - - -@pytest.mark.subprocess(err=None) -def test_gather_returns_results_in_order() -> None: - """``asyncio.gather(c1, c2, c3)`` must return ``[r1, r2, r3]`` in order - even after wrapping. - """ - import asyncio - - from ddtrace.profiling import profiler - - p = profiler.Profiler() - p.start() - try: - - async def child(x: int) -> int: - # Stagger completion so the ordering test is real. - await asyncio.sleep((10 - x) * 0.001) - return x - - async def main() -> list[int]: - # asyncio.gather is typed as returning tuple[T1, T2, ...]; runtime is a list. - return list(await asyncio.gather(child(1), child(2), child(3))) - - assert asyncio.run(main()) == [1, 2, 3] - finally: - p.stop() - - @pytest.mark.subprocess(err=None) def test_gather_with_return_exceptions_keeps_kwarg() -> None: """``asyncio.gather(..., return_exceptions=True)`` must return exceptions @@ -656,29 +459,6 @@ async def main(): p.stop() -@pytest.mark.subprocess(err=None) -def test_shield_returns_underlying_result() -> None: - """``await asyncio.shield(coro)`` must yield the coroutine's return value.""" - import asyncio - - from ddtrace.profiling import profiler - - p = profiler.Profiler() - p.start() - try: - - async def child() -> str: - await asyncio.sleep(0) - return "shielded-value" - - async def main() -> str: - return await asyncio.shield(child()) - - assert asyncio.run(main()) == "shielded-value" - finally: - p.stop() - - # --------------------------------------------------------------------------- # Edge cases — empty inputs and error paths must not blow up the wrappers. # --------------------------------------------------------------------------- @@ -717,29 +497,6 @@ async def main(): p.stop() -@pytest.mark.subprocess(err=None) -def test_as_completed_empty_iterator() -> None: - """``asyncio.as_completed([])`` must yield nothing and not crash.""" - import asyncio - - from ddtrace.profiling import profiler - - p = profiler.Profiler() - p.start() - try: - - async def main() -> list[int]: - results: list[int] = [] - empty: list[asyncio.Future[int]] = [] - for fut in asyncio.as_completed(empty): - results.append(await fut) - return results - - assert asyncio.run(main()) == [] - finally: - p.stop() - - @pytest.mark.subprocess(err=None) def test_create_task_propagates_exception() -> None: """If the wrapped coroutine raises, the exception must propagate via @@ -769,39 +526,6 @@ async def main(): p.stop() -@pytest.mark.subprocess(err=None) -def test_wrapped_create_task_returns_real_task_object() -> None: - """``asyncio.create_task(coro)`` must return a genuine ``asyncio.Task`` - (not a wrapper / proxy). Some downstream code calls ``Task``-specific - methods on the result. - """ - import asyncio - - from ddtrace.profiling import profiler - - p = profiler.Profiler() - p.start() - try: - - async def child() -> None: - await asyncio.sleep(0) - - async def main(): - t = asyncio.create_task(child()) - try: - # Access a Task-specific method - assert isinstance(t, asyncio.Task) - assert hasattr(t, "get_name") - assert hasattr(t, "cancel") - finally: - await t - return True - - assert asyncio.run(main()) is True - finally: - p.stop() - - # --------------------------------------------------------------------------- # TaskGroup-specific edge cases (3.11+) # --------------------------------------------------------------------------- From 78e8e9e65bfe3e4199e90a55a4a3cbbb7ea2e068 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 11 May 2026 14:22:12 -0400 Subject: [PATCH 09/19] test: extract profiler-setup boilerplate into _asyncio_wrap_helpers --- .../collector/_asyncio_wrap_helpers.py | 59 ++++ .../collector/test_asyncio_wrapping.py | 259 ++++-------------- 2 files changed, 119 insertions(+), 199 deletions(-) create mode 100644 tests/profiling/collector/_asyncio_wrap_helpers.py diff --git a/tests/profiling/collector/_asyncio_wrap_helpers.py b/tests/profiling/collector/_asyncio_wrap_helpers.py new file mode 100644 index 00000000000..55dd22d45eb --- /dev/null +++ b/tests/profiling/collector/_asyncio_wrap_helpers.py @@ -0,0 +1,59 @@ +"""Subprocess-body helpers for the asyncio-wrap tests. + +``@pytest.mark.subprocess`` extracts only the test function's body and +executes it as a standalone script (see ``FunctionDefFinder`` in +``tests/conftest.py``). Module-level definitions in the test file are +therefore invisible to the subprocess. Helpers must live in a separate +importable module — which this file is. + +Imported as ``from tests.profiling.collector._asyncio_wrap_helpers import ...`` +inside each subprocess test body. +""" + +from __future__ import annotations + +from contextlib import contextmanager +from typing import Any +from typing import Callable +from typing import Iterator + + +@contextmanager +def started_profiler() -> Iterator[Any]: + """Context manager that starts the profiler on entry and stops it on + exit, even on assertion failure. Yields the Profiler instance. + """ + from ddtrace.profiling import profiler + + p = profiler.Profiler() + p.start() + try: + yield p + finally: + p.stop() + + +@contextmanager +def captured_link_calls(attr: str) -> Iterator[list[int]]: + """Replace ``ddtrace.internal.datadog.profiling.stack.`` with a + recorder that captures the child task id of each call, yielding the + growing list. Restores the original on exit. + + ``attr`` is one of ``"link_tasks"`` / ``"weak_link_tasks"`` / + ``"track_asyncio_loop"`` — anything with a ``(_, second_arg)`` shape + where the second arg is the thing we want to identify by id. + """ + from ddtrace.internal.datadog.profiling import stack + + original: Callable[..., Any] = getattr(stack, attr) + recorded: list[int] = [] + + def recorder(first: Any, second: Any) -> Any: + recorded.append(id(second)) + return original(first, second) + + setattr(stack, attr, recorder) + try: + yield recorded + finally: + setattr(stack, attr, original) diff --git a/tests/profiling/collector/test_asyncio_wrapping.py b/tests/profiling/collector/test_asyncio_wrapping.py index ba85def403f..3f12d4f6125 100644 --- a/tests/profiling/collector/test_asyncio_wrapping.py +++ b/tests/profiling/collector/test_asyncio_wrapping.py @@ -43,20 +43,10 @@ def test_gather_triggers_link_tasks() -> None: """ import asyncio - from ddtrace.internal.datadog.profiling import stack - from ddtrace.profiling import profiler - - p = profiler.Profiler() - p.start() - try: - recorded_children: list[int] = [] - original_link = stack.link_tasks - - def recorder(parent, child) -> None: - recorded_children.append(id(child)) - return original_link(parent, child) + from tests.profiling.collector._asyncio_wrap_helpers import captured_link_calls + from tests.profiling.collector._asyncio_wrap_helpers import started_profiler - stack.link_tasks = recorder + with started_profiler(), captured_link_calls("link_tasks") as recorded: async def child() -> int: await asyncio.sleep(0) @@ -70,36 +60,21 @@ async def main(): t1_id, t2_id = asyncio.run(main()) - assert t1_id in recorded_children, "gather did not link first child" - assert t2_id in recorded_children, "gather did not link second child" - finally: - p.stop() + assert t1_id in recorded, "gather did not link first child" + assert t2_id in recorded, "gather did not link second child" @pytest.mark.subprocess(err=None) def test_shield_triggers_link_tasks() -> None: """``asyncio.shield(awaitable)`` must invoke ``stack.link_tasks`` for the - shielded future. The wrapper additionally wraps the awaitable into a - ``Future`` via ``ensure_future`` and substitutes it back into the call — - we only check that link_tasks fires; the substitution correctness is - covered by the existing ``test_asyncio_shield.py`` integration test. + shielded future. """ import asyncio - from ddtrace.internal.datadog.profiling import stack - from ddtrace.profiling import profiler - - p = profiler.Profiler() - p.start() - try: - recorded: list[int] = [] - original_link = stack.link_tasks + from tests.profiling.collector._asyncio_wrap_helpers import captured_link_calls + from tests.profiling.collector._asyncio_wrap_helpers import started_profiler - def recorder(parent, child) -> None: - recorded.append(id(child)) - return original_link(parent, child) - - stack.link_tasks = recorder + with started_profiler(), captured_link_calls("link_tasks") as recorded: async def child() -> int: await asyncio.sleep(0) @@ -108,11 +83,8 @@ async def child() -> int: async def main() -> int: return await asyncio.shield(child()) - result = asyncio.run(main()) - assert result == 7 + assert asyncio.run(main()) == 7 assert len(recorded) >= 1, "shield did not fire link_tasks" - finally: - p.stop() @pytest.mark.subprocess(err=None) @@ -122,39 +94,23 @@ def test_as_completed_triggers_link_tasks_per_child() -> None: """ import asyncio - from ddtrace.internal.datadog.profiling import stack - from ddtrace.profiling import profiler + from tests.profiling.collector._asyncio_wrap_helpers import captured_link_calls + from tests.profiling.collector._asyncio_wrap_helpers import started_profiler - p = profiler.Profiler() - p.start() - try: - recorded: list[int] = [] - original_link = stack.link_tasks - - def recorder(parent, child) -> None: - recorded.append(id(child)) - return original_link(parent, child) - - stack.link_tasks = recorder + with started_profiler(), captured_link_calls("link_tasks") as recorded: async def child(x: int) -> int: await asyncio.sleep(0) return x async def main() -> list[int]: - coros = [child(i) for i in range(3)] results = [] - for fut in asyncio.as_completed(coros): + for fut in asyncio.as_completed([child(i) for i in range(3)]): results.append(await fut) return sorted(results) - results = asyncio.run(main()) - assert results == [0, 1, 2] - # as_completed wraps each coro into a Future via ensure_future and - # links each one — so we expect at least 3 callbacks. - assert len(recorded) >= 3, "as_completed fired link_tasks %d times, expected >= 3" % len(recorded) - finally: - p.stop() + assert asyncio.run(main()) == [0, 1, 2] + assert len(recorded) >= 3, f"as_completed fired link_tasks {len(recorded)} times, expected >= 3" @pytest.mark.subprocess(err=None) @@ -164,20 +120,10 @@ def test_wait_triggers_link_tasks_per_future() -> None: """ import asyncio - from ddtrace.internal.datadog.profiling import stack - from ddtrace.profiling import profiler + from tests.profiling.collector._asyncio_wrap_helpers import captured_link_calls + from tests.profiling.collector._asyncio_wrap_helpers import started_profiler - p = profiler.Profiler() - p.start() - try: - recorded: list[int] = [] - original_link = stack.link_tasks - - def recorder(parent, child) -> None: - recorded.append(id(child)) - return original_link(parent, child) - - stack.link_tasks = recorder + with started_profiler(), captured_link_calls("link_tasks") as recorded: async def child() -> None: await asyncio.sleep(0) @@ -189,12 +135,8 @@ async def main(): return id(t1), id(t2) t1_id, t2_id = asyncio.run(main()) - # wait may also fire gather-style link_tasks on the inner _GatheringFuture - # — we only require both leaf futures show up. assert t1_id in recorded, "wait did not link first future" assert t2_id in recorded, "wait did not link second future" - finally: - p.stop() @pytest.mark.skipif(sys.version_info < (3, 11), reason="TaskGroup is Python 3.11+") @@ -205,20 +147,10 @@ def test_taskgroup_triggers_link_tasks() -> None: """ import asyncio - from ddtrace.internal.datadog.profiling import stack - from ddtrace.profiling import profiler - - p = profiler.Profiler() - p.start() - try: - recorded: list[int] = [] - original_link = stack.link_tasks - - def recorder(parent, child) -> None: - recorded.append(id(child)) - return original_link(parent, child) + from tests.profiling.collector._asyncio_wrap_helpers import captured_link_calls + from tests.profiling.collector._asyncio_wrap_helpers import started_profiler - stack.link_tasks = recorder + with started_profiler(), captured_link_calls("link_tasks") as recorded: async def child(x: int) -> int: await asyncio.sleep(0) @@ -226,19 +158,16 @@ async def child(x: int) -> int: async def main() -> list[int]: results: list[int] = [] - # mypy doesn't know about TaskGroup on older type stubs and the - # skipif gate above means this code only runs on 3.11+. + # mypy doesn't know about TaskGroup on older type stubs; the + # skipif gate above means this only runs on 3.11+. async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined] tasks = [tg.create_task(child(i)) for i in range(3)] for t in tasks: results.append(t.result()) return sorted(results) - results = asyncio.run(main()) - assert results == [0, 1, 2] - assert len(recorded) >= 3, "TaskGroup.create_task fired link_tasks %d times, expected >= 3" % len(recorded) - finally: - p.stop() + assert asyncio.run(main()) == [0, 1, 2] + assert len(recorded) >= 3, f"TaskGroup.create_task fired link_tasks {len(recorded)} times, expected >= 3" @pytest.mark.subprocess(err=None) @@ -248,22 +177,10 @@ def test_set_event_loop_triggers_track_asyncio_loop() -> None: """ import asyncio - from ddtrace.internal.datadog.profiling import stack - from ddtrace.profiling import profiler - - p = profiler.Profiler() - p.start() - try: - recorded: list[int] = [] - original_track = stack.track_asyncio_loop - - def recorder(thread_id, loop) -> None: - if loop is not None: - recorded.append(id(loop)) - return original_track(thread_id, loop) - - stack.track_asyncio_loop = recorder + from tests.profiling.collector._asyncio_wrap_helpers import captured_link_calls + from tests.profiling.collector._asyncio_wrap_helpers import started_profiler + with started_profiler(), captured_link_calls("track_asyncio_loop") as recorded: loop = asyncio.new_event_loop() try: asyncio.set_event_loop(loop) @@ -271,8 +188,6 @@ def recorder(thread_id, loop) -> None: finally: asyncio.set_event_loop(None) loop.close() - finally: - p.stop() # --------------------------------------------------------------------------- @@ -321,20 +236,10 @@ def test_pre_cached_reference_still_triggers_callback() -> None: # modules effectively do at their own module-init time. from asyncio.tasks import create_task as pre_cached_create_task - from ddtrace.internal.datadog.profiling import stack - from ddtrace.profiling import profiler - - p = profiler.Profiler() - p.start() - try: - recorded_child_ids: list[int] = [] - original_weak_link = stack.weak_link_tasks + from tests.profiling.collector._asyncio_wrap_helpers import captured_link_calls + from tests.profiling.collector._asyncio_wrap_helpers import started_profiler - def recorder(parent, child) -> None: - recorded_child_ids.append(id(child)) - return original_weak_link(parent, child) - - stack.weak_link_tasks = recorder + with started_profiler(), captured_link_calls("weak_link_tasks") as recorded: async def child() -> None: await asyncio.sleep(0) @@ -347,13 +252,11 @@ async def main() -> int: task_id = asyncio.run(main()) - assert task_id in recorded_child_ids, ( + assert task_id in recorded, ( "create_task invoked through a reference cached before profiler " "start did not trigger stack.weak_link_tasks — identity-preserving " "wrap is broken" ) - finally: - p.stop() # --------------------------------------------------------------------------- @@ -372,11 +275,9 @@ def test_keyword_arg_form_does_not_double_substitute() -> None: """ import asyncio - from ddtrace.profiling import profiler + from tests.profiling.collector._asyncio_wrap_helpers import started_profiler - p = profiler.Profiler() - p.start() - try: + with started_profiler(): async def child(x: int) -> int: await asyncio.sleep(0) @@ -393,8 +294,6 @@ async def main_as_completed() -> list[int]: assert asyncio.run(main_shield()) == 11 assert asyncio.run(main_as_completed()) == [0, 1, 2] - finally: - p.stop() # --------------------------------------------------------------------------- @@ -410,11 +309,9 @@ def test_gather_with_return_exceptions_keeps_kwarg() -> None: """ import asyncio - from ddtrace.profiling import profiler + from tests.profiling.collector._asyncio_wrap_helpers import started_profiler - p = profiler.Profiler() - p.start() - try: + with started_profiler(): async def good() -> int: return 1 @@ -427,8 +324,6 @@ async def main(): return [type(r).__name__ if isinstance(r, BaseException) else r for r in results] assert asyncio.run(main()) == [1, "ValueError"] - finally: - p.stop() @pytest.mark.subprocess(err=None) @@ -436,11 +331,9 @@ def test_wait_returns_done_pending_tuple() -> None: """``asyncio.wait`` must still return a ``(done, pending)`` tuple.""" import asyncio - from ddtrace.profiling import profiler + from tests.profiling.collector._asyncio_wrap_helpers import started_profiler - p = profiler.Profiler() - p.start() - try: + with started_profiler(): async def child(x: int) -> int: await asyncio.sleep(0) @@ -455,8 +348,6 @@ async def main(): n_done, n_pending = asyncio.run(main()) assert n_done == 2 assert n_pending == 0 - finally: - p.stop() # --------------------------------------------------------------------------- @@ -471,30 +362,16 @@ def test_gather_empty_does_not_link() -> None: """ import asyncio - from ddtrace.internal.datadog.profiling import stack - from ddtrace.profiling import profiler + from tests.profiling.collector._asyncio_wrap_helpers import captured_link_calls + from tests.profiling.collector._asyncio_wrap_helpers import started_profiler - p = profiler.Profiler() - p.start() - try: - recorded: list[int] = [] - original_link = stack.link_tasks - - def recorder(parent, child) -> None: - recorded.append(id(child)) - return original_link(parent, child) - - stack.link_tasks = recorder + with started_profiler(), captured_link_calls("link_tasks") as recorded: async def main(): return await asyncio.gather() - result = asyncio.run(main()) - assert result == [] - # Empty gather → no children → no link_tasks calls + assert asyncio.run(main()) == [] assert recorded == [], f"Empty gather fired link_tasks: {recorded}" - finally: - p.stop() @pytest.mark.subprocess(err=None) @@ -504,11 +381,9 @@ def test_create_task_propagates_exception() -> None: """ import asyncio - from ddtrace.profiling import profiler + from tests.profiling.collector._asyncio_wrap_helpers import started_profiler - p = profiler.Profiler() - p.start() - try: + with started_profiler(): async def child() -> int: raise RuntimeError("expected boom") @@ -522,8 +397,6 @@ async def main(): return None assert asyncio.run(main()) == "expected boom" - finally: - p.stop() # --------------------------------------------------------------------------- @@ -540,11 +413,9 @@ def test_taskgroup_exception_propagates_through_wrapper() -> None: """ import asyncio - from ddtrace.profiling import profiler + from tests.profiling.collector._asyncio_wrap_helpers import started_profiler - p = profiler.Profiler() - p.start() - try: + with started_profiler(): async def child_ok() -> int: await asyncio.sleep(0) @@ -559,8 +430,6 @@ async def main(): tg.create_task(child_ok()) tg.create_task(child_bad()) except BaseException as outer: - # Could be ExceptionGroup (3.11+) or BaseExceptionGroup; - # check the structure includes our ValueError. exc_strs = [] def collect(e): @@ -574,10 +443,7 @@ def collect(e): return exc_strs return [] - result = asyncio.run(main()) - assert ("ValueError", "expected") in result, f"Unexpected: {result}" - finally: - p.stop() + assert ("ValueError", "expected") in asyncio.run(main()) # --------------------------------------------------------------------------- @@ -594,35 +460,30 @@ def test_wrapping_persists_across_profiler_restart() -> None: """ import asyncio - from ddtrace.internal.datadog.profiling import stack from ddtrace.profiling import profiler + from tests.profiling.collector._asyncio_wrap_helpers import captured_link_calls + # First cycle: start + stop. p = profiler.Profiler() p.start() p.stop() + # Second cycle: wraps should still be in place. p2 = profiler.Profiler() p2.start() try: - recorded: list[int] = [] - original = stack.weak_link_tasks + with captured_link_calls("weak_link_tasks") as recorded: - def recorder(parent, child) -> None: - recorded.append(id(child)) - return original(parent, child) + async def child() -> None: + await asyncio.sleep(0) - stack.weak_link_tasks = recorder - - async def child() -> None: - await asyncio.sleep(0) - - async def main(): - t = asyncio.create_task(child()) - await t - return id(t) + async def main(): + t = asyncio.create_task(child()) + await t + return id(t) - task_id = asyncio.run(main()) - assert task_id in recorded, "Wrapping did not survive profiler restart" + task_id = asyncio.run(main()) + assert task_id in recorded, "Wrapping did not survive profiler restart" finally: p2.stop() From b7cc82405fd5a94c635d3bcc0edcb0343082aac4 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 11 May 2026 15:30:57 -0400 Subject: [PATCH 10/19] refactor(profiling): replace exec-based trampoline with code.replace() template --- ddtrace/profiling/_asyncio.py | 105 +++++++++++++++++----------------- 1 file changed, 53 insertions(+), 52 deletions(-) diff --git a/ddtrace/profiling/_asyncio.py b/ddtrace/profiling/_asyncio.py index 4037613959e..adb471a1413 100644 --- a/ddtrace/profiling/_asyncio.py +++ b/ddtrace/profiling/_asyncio.py @@ -25,26 +25,44 @@ # Trampoline dispatch table. -# Key: id(original) of the wrapped function (kept alive by the module/class -# attribute pointing at it; its identity is preserved across wraps). +# Key: ``id(code)`` of the trampoline-clone we grafted onto the wrapped +# function. Each wrap site gets a fresh code object via ``code.replace()`` +# so ids are unique. The code object is kept alive by ``original.__code__``. # Value: (user_wrapper, original_copy) _WRAP_REGISTRY: dict[int, tuple[typing.Callable[..., typing.Any], types.FunctionType]] = {} -def _ddtrace_dispatch_wrap(target_id: int, args: tuple[typing.Any, ...], kwargs: dict[str, typing.Any]) -> typing.Any: - """Sync dispatcher invoked by a wrapped function's trampoline bytecode.""" - wrapper, original_copy = _WRAP_REGISTRY[target_id] +def _ddtrace_dispatch_wrap(args: tuple[typing.Any, ...], kwargs: dict[str, typing.Any]) -> typing.Any: + """Sync dispatcher — called by the sync trampoline template's bytecode. + + Identifies which wrap site is calling by reading the caller frame's + code-object id. Each wrapped function has a unique cloned trampoline + code object (via ``code.replace()``), so ``id(f_code)`` is a stable + per-wrap-site key. + """ + wrapper, original_copy = _WRAP_REGISTRY[id(sys._getframe(1).f_code)] return wrapper(original_copy, args, kwargs) -async def _ddtrace_dispatch_wrap_async( - target_id: int, args: tuple[typing.Any, ...], kwargs: dict[str, typing.Any] -) -> typing.Any: - """Async dispatcher invoked by a wrapped coroutine function's trampoline.""" - wrapper, original_copy = _WRAP_REGISTRY[target_id] +async def _ddtrace_dispatch_wrap_async(args: tuple[typing.Any, ...], kwargs: dict[str, typing.Any]) -> typing.Any: + """Async dispatcher for coroutine-function wrap sites — see sync variant.""" + wrapper, original_copy = _WRAP_REGISTRY[id(sys._getframe(1).f_code)] return await wrapper(original_copy, args, kwargs) +# Template trampolines. Their ``__code__`` is the bytecode we reuse: per +# wrap site we ``replace()`` the template's code object to stamp the +# original's filename / lineno / co_name, then graft it onto the original +# function. Each ``.replace()`` returns a fresh code object, giving the +# dispatcher's ``id(f_code)`` lookup a unique key per wrap site. +def _ddtrace_trampoline_sync(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: + return _ddtrace_dispatch_wrap(args, kwargs) + + +async def _ddtrace_trampoline_async(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: + return await _ddtrace_dispatch_wrap_async(args, kwargs) + + def _wrap( owner: typing.Any, name: str, @@ -53,13 +71,13 @@ def _wrap( ) -> typing.Callable[..., typing.Any]: """Wrap ``owner.name`` so calls go through ``wrapper(original, args, kwargs)``. - For pure-Python functions (``types.FunctionType``) we mutate the - original function's ``__code__`` in place to a tiny trampoline that - dispatches to the user wrapper. Function identity is preserved, so - pre-existing captured references (e.g. ``from X import Y`` performed - before the profiler starts) still see the wrapped behaviour — this - matches what ``ddtrace.internal.wrapping.wrap`` did via the - ``bytecode`` library, without taking on that dependency. + For pure-Python functions (``types.FunctionType``) we clone a + template trampoline's bytecode via ``code.replace()`` and graft it + onto the original function in place. Function identity is preserved, + so pre-existing captured references (e.g. ``from X import Y`` + performed before the profiler starts) still see the wrapped + behaviour — this matches what ``ddtrace.internal.wrapping.wrap`` did + via the ``bytecode`` library, without taking on that dependency. For non-Python callables (Cython methods, C builtins) we fall back to ``setattr`` and mirror onto ``aliases``. ``aliases`` is a no-op on the @@ -70,10 +88,10 @@ def _wrap( if isinstance(original, types.FunctionType) and not original.__closure__: # Identity-preserving path: mutate __code__ in place. - # We require the function to have no closure cells — the trampoline - # we generate has none, and __code__ swaps must match free-var counts. - # Class methods using super() (e.g. _GatheringFuture.__init__) carry - # a __class__ closure cell and therefore fall through to setattr. + # We require the function to have no closure cells — the template + # trampoline has none, and ``__code__`` swaps must match free-var + # counts. Class methods using super() (e.g. _GatheringFuture.__init__) + # carry a __class__ closure cell and therefore fall through to setattr. original_copy = types.FunctionType( original.__code__, original.__globals__, @@ -84,41 +102,24 @@ def _wrap( original_copy.__kwdefaults__ = original.__kwdefaults__ is_async = inspect.iscoroutinefunction(original) - target_id = id(original) - _WRAP_REGISTRY[target_id] = (wrapper, original_copy) - - dispatcher_name = "_ddtrace_dispatch_wrap_async" if is_async else "_ddtrace_dispatch_wrap" - trampoline_name = original.__name__ if original.__name__.isidentifier() else "_ddtrace_trampoline" - - if is_async: - source = ( - f"async def {trampoline_name}(*args, **kwargs):\n" - f" return await {dispatcher_name}({target_id}, args, kwargs)\n" - ) - else: - source = ( - f"def {trampoline_name}(*args, **kwargs):\n return {dispatcher_name}({target_id}, args, kwargs)\n" - ) - - ns: dict[str, typing.Any] = {} - # nosec B102: source is built from a fixed template; the only - # interpolated values are an int (target_id, from id()) and a - # name validated via .isidentifier(). No untrusted input. - exec(source, ns) # nosec B102 - trampoline = ns[trampoline_name] - - # The trampoline uses LOAD_GLOBAL for dispatcher_name, resolved - # against original's module globals at call time. Inject the - # dispatcher there (idempotently — many functions in the same - # module share one entry). - original.__globals__.setdefault(dispatcher_name, globals()[dispatcher_name]) - - # Preserve filename / firstlineno / co_name for stack-trace clarity. - new_code = trampoline.__code__.replace( + template = _ddtrace_trampoline_async if is_async else _ddtrace_trampoline_sync + dispatcher = _ddtrace_dispatch_wrap_async if is_async else _ddtrace_dispatch_wrap + + # Clone the template's bytecode and stamp original's metadata for + # stack-trace clarity. ``replace()`` always returns a new code + # object, so ``id(new_code)`` is unique per wrap site. + new_code = template.__code__.replace( co_filename=original.__code__.co_filename, co_firstlineno=original.__code__.co_firstlineno, co_name=original.__code__.co_name, ) + _WRAP_REGISTRY[id(new_code)] = (wrapper, original_copy) + + # The trampoline bytecode uses LOAD_GLOBAL on the dispatcher name, + # resolved against original's module globals at call time. Inject + # it there once per module (idempotent across wrap sites). + original.__globals__.setdefault(dispatcher.__name__, dispatcher) + original.__code__ = new_code return original From 6e6282a0506ea73c971d39a9b6e5fa511a86fdb7 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 11 May 2026 16:21:25 -0400 Subject: [PATCH 11/19] fix(profiling): preserve signature metadata via __wrapped__ in identity-preserving wrap --- ddtrace/profiling/_asyncio.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/ddtrace/profiling/_asyncio.py b/ddtrace/profiling/_asyncio.py index adb471a1413..283e60231ef 100644 --- a/ddtrace/profiling/_asyncio.py +++ b/ddtrace/profiling/_asyncio.py @@ -108,11 +108,16 @@ def _wrap( # Clone the template's bytecode and stamp original's metadata for # stack-trace clarity. ``replace()`` always returns a new code # object, so ``id(new_code)`` is unique per wrap site. - new_code = template.__code__.replace( + replace_kwargs: dict[str, typing.Any] = dict( co_filename=original.__code__.co_filename, co_firstlineno=original.__code__.co_firstlineno, co_name=original.__code__.co_name, ) + # co_qualname is 3.11+; preserve it when available so qualified + # stack frames stay accurate. + if hasattr(original.__code__, "co_qualname"): + replace_kwargs["co_qualname"] = original.__code__.co_qualname + new_code = template.__code__.replace(**replace_kwargs) _WRAP_REGISTRY[id(new_code)] = (wrapper, original_copy) # The trampoline bytecode uses LOAD_GLOBAL on the dispatcher name, @@ -121,6 +126,12 @@ def _wrap( original.__globals__.setdefault(dispatcher.__name__, dispatcher) original.__code__ = new_code + # Preserve introspection: the trampoline's (*args, **kwargs) shape + # would otherwise leak through to ``inspect.signature(original)`` + # and break callers that adapt to the original signature (FastAPI, + # validators, etc.). ``inspect.signature`` follows ``__wrapped__`` + # to return the underlying callable's signature. + original.__wrapped__ = original_copy # type: ignore[attr-defined] return original # Fallback for Cython / C builtins or Python functions with closure From f69ebc5c42ee4ca097024927941cc9798bb71bbf Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 11 May 2026 16:24:42 -0400 Subject: [PATCH 12/19] test(profiling): cover inspect.signature preservation; assert stack.is_available in helper --- .../collector/_asyncio_wrap_helpers.py | 6 ++++ .../collector/test_asyncio_wrapping.py | 30 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/tests/profiling/collector/_asyncio_wrap_helpers.py b/tests/profiling/collector/_asyncio_wrap_helpers.py index 55dd22d45eb..b3ab1a1f94a 100644 --- a/tests/profiling/collector/_asyncio_wrap_helpers.py +++ b/tests/profiling/collector/_asyncio_wrap_helpers.py @@ -42,9 +42,15 @@ def captured_link_calls(attr: str) -> Iterator[list[int]]: ``attr`` is one of ``"link_tasks"`` / ``"weak_link_tasks"`` / ``"track_asyncio_loop"`` — anything with a ``(_, second_arg)`` shape where the second arg is the thing we want to identify by id. + + Raises ``AssertionError`` (with ``stack.failure_msg``) if the native + stack extension is unavailable — surfaces the root cause clearly + rather than an opaque ``AttributeError`` on ``getattr(stack, attr)``. """ from ddtrace.internal.datadog.profiling import stack + assert stack.is_available, stack.failure_msg + original: Callable[..., Any] = getattr(stack, attr) recorded: list[int] = [] diff --git a/tests/profiling/collector/test_asyncio_wrapping.py b/tests/profiling/collector/test_asyncio_wrapping.py index 3f12d4f6125..81a404b9f25 100644 --- a/tests/profiling/collector/test_asyncio_wrapping.py +++ b/tests/profiling/collector/test_asyncio_wrapping.py @@ -190,6 +190,36 @@ def test_set_event_loop_triggers_track_asyncio_loop() -> None: loop.close() +# --------------------------------------------------------------------------- +# Introspection metadata — the trampoline's (*args, **kwargs) shape must +# not leak through; downstream libraries (FastAPI, validators, …) +# introspect asyncio API signatures and break if we report something +# other than the real shape. +# --------------------------------------------------------------------------- + + +@pytest.mark.subprocess(err=None) +def test_wrap_preserves_inspect_signature() -> None: + """``inspect.signature(asyncio.tasks.create_task)`` after profiler start + must match the unwrapped signature. The trampoline carries a + ``(*args, **kwargs)`` shape; ``__wrapped__`` set on the original lets + ``inspect.signature`` recover the real argument metadata. + """ + import asyncio + import inspect + + pre_sig = inspect.signature(asyncio.tasks.create_task) + + from tests.profiling.collector._asyncio_wrap_helpers import started_profiler + + with started_profiler(): + post_sig = inspect.signature(asyncio.tasks.create_task) + assert str(post_sig) == str(pre_sig), f"signature regressed under wrap: {pre_sig} -> {post_sig}" + assert hasattr(asyncio.tasks.create_task, "__wrapped__"), ( + "__wrapped__ must be set so inspect.signature() can recover the original signature" + ) + + # --------------------------------------------------------------------------- # Wrap gating: with stack profiling disabled, importing _asyncio must not # mutate asyncio.tasks.create_task. The wrapping inside the ModuleWatchdog From cd62165b860bdfb677d8615d594a1c62c04c6af1 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 11 May 2026 16:48:37 -0400 Subject: [PATCH 13/19] use template.__code__.replace --- ddtrace/profiling/_asyncio.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/ddtrace/profiling/_asyncio.py b/ddtrace/profiling/_asyncio.py index 283e60231ef..adb471a1413 100644 --- a/ddtrace/profiling/_asyncio.py +++ b/ddtrace/profiling/_asyncio.py @@ -108,16 +108,11 @@ def _wrap( # Clone the template's bytecode and stamp original's metadata for # stack-trace clarity. ``replace()`` always returns a new code # object, so ``id(new_code)`` is unique per wrap site. - replace_kwargs: dict[str, typing.Any] = dict( + new_code = template.__code__.replace( co_filename=original.__code__.co_filename, co_firstlineno=original.__code__.co_firstlineno, co_name=original.__code__.co_name, ) - # co_qualname is 3.11+; preserve it when available so qualified - # stack frames stay accurate. - if hasattr(original.__code__, "co_qualname"): - replace_kwargs["co_qualname"] = original.__code__.co_qualname - new_code = template.__code__.replace(**replace_kwargs) _WRAP_REGISTRY[id(new_code)] = (wrapper, original_copy) # The trampoline bytecode uses LOAD_GLOBAL on the dispatcher name, @@ -126,12 +121,6 @@ def _wrap( original.__globals__.setdefault(dispatcher.__name__, dispatcher) original.__code__ = new_code - # Preserve introspection: the trampoline's (*args, **kwargs) shape - # would otherwise leak through to ``inspect.signature(original)`` - # and break callers that adapt to the original signature (FastAPI, - # validators, etc.). ``inspect.signature`` follows ``__wrapped__`` - # to return the underlying callable's signature. - original.__wrapped__ = original_copy # type: ignore[attr-defined] return original # Fallback for Cython / C builtins or Python functions with closure From c377bdf6051662f31360b67fc113dd49bc70902c Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 11 May 2026 17:14:50 -0400 Subject: [PATCH 14/19] refactor(profiling): collapse trampoline+dispatcher into one function per sync/async --- ddtrace/profiling/_asyncio.py | 58 +++++++++++++++-------------------- 1 file changed, 25 insertions(+), 33 deletions(-) diff --git a/ddtrace/profiling/_asyncio.py b/ddtrace/profiling/_asyncio.py index adb471a1413..fa99a8ba131 100644 --- a/ddtrace/profiling/_asyncio.py +++ b/ddtrace/profiling/_asyncio.py @@ -24,43 +24,27 @@ ASYNCIO_IMPORTED: bool = False -# Trampoline dispatch table. +# Wrap registry, looked up at call time by the trampoline's bytecode. # Key: ``id(code)`` of the trampoline-clone we grafted onto the wrapped # function. Each wrap site gets a fresh code object via ``code.replace()`` # so ids are unique. The code object is kept alive by ``original.__code__``. # Value: (user_wrapper, original_copy) -_WRAP_REGISTRY: dict[int, tuple[typing.Callable[..., typing.Any], types.FunctionType]] = {} +_ddtrace_wrap_registry: dict[int, tuple[typing.Callable[..., typing.Any], types.FunctionType]] = {} -def _ddtrace_dispatch_wrap(args: tuple[typing.Any, ...], kwargs: dict[str, typing.Any]) -> typing.Any: - """Sync dispatcher — called by the sync trampoline template's bytecode. - - Identifies which wrap site is calling by reading the caller frame's - code-object id. Each wrapped function has a unique cloned trampoline - code object (via ``code.replace()``), so ``id(f_code)`` is a stable - per-wrap-site key. - """ - wrapper, original_copy = _WRAP_REGISTRY[id(sys._getframe(1).f_code)] - return wrapper(original_copy, args, kwargs) - - -async def _ddtrace_dispatch_wrap_async(args: tuple[typing.Any, ...], kwargs: dict[str, typing.Any]) -> typing.Any: - """Async dispatcher for coroutine-function wrap sites — see sync variant.""" - wrapper, original_copy = _WRAP_REGISTRY[id(sys._getframe(1).f_code)] - return await wrapper(original_copy, args, kwargs) - - -# Template trampolines. Their ``__code__`` is the bytecode we reuse: per -# wrap site we ``replace()`` the template's code object to stamp the -# original's filename / lineno / co_name, then graft it onto the original -# function. Each ``.replace()`` returns a fresh code object, giving the -# dispatcher's ``id(f_code)`` lookup a unique key per wrap site. +# Template trampolines: their ``__code__`` is what we clone via +# ``CodeType.replace()`` per wrap site and graft onto the original +# function. Each clone has a unique ``id(code)`` which the trampoline +# itself reads via ``sys._getframe(0).f_code`` to look up its wrapper. +# Two templates (sync / async) cover all wrap targets. def _ddtrace_trampoline_sync(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: - return _ddtrace_dispatch_wrap(args, kwargs) + wrapper, original_copy = _ddtrace_wrap_registry[id(sys._getframe(0).f_code)] + return wrapper(original_copy, args, kwargs) async def _ddtrace_trampoline_async(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: - return await _ddtrace_dispatch_wrap_async(args, kwargs) + wrapper, original_copy = _ddtrace_wrap_registry[id(sys._getframe(0).f_code)] + return await wrapper(original_copy, args, kwargs) def _wrap( @@ -103,7 +87,6 @@ def _wrap( is_async = inspect.iscoroutinefunction(original) template = _ddtrace_trampoline_async if is_async else _ddtrace_trampoline_sync - dispatcher = _ddtrace_dispatch_wrap_async if is_async else _ddtrace_dispatch_wrap # Clone the template's bytecode and stamp original's metadata for # stack-trace clarity. ``replace()`` always returns a new code @@ -113,14 +96,23 @@ def _wrap( co_firstlineno=original.__code__.co_firstlineno, co_name=original.__code__.co_name, ) - _WRAP_REGISTRY[id(new_code)] = (wrapper, original_copy) + _ddtrace_wrap_registry[id(new_code)] = (wrapper, original_copy) - # The trampoline bytecode uses LOAD_GLOBAL on the dispatcher name, - # resolved against original's module globals at call time. Inject - # it there once per module (idempotent across wrap sites). - original.__globals__.setdefault(dispatcher.__name__, dispatcher) + # The trampoline bytecode does LOAD_GLOBAL on ``_ddtrace_wrap_registry`` + # and ``sys``, resolved against original's module globals at call + # time. Inject both (sys is normally already imported by asyncio + # internals, but ``setdefault`` is harmless and covers exotic + # targets). Idempotent across wrap sites in the same module. + original.__globals__.setdefault("_ddtrace_wrap_registry", _ddtrace_wrap_registry) + original.__globals__.setdefault("sys", sys) original.__code__ = new_code + # Preserve introspection: the trampoline's (*args, **kwargs) shape + # would otherwise leak through to ``inspect.signature(original)`` + # and break callers that adapt to the original signature (FastAPI, + # validators, etc.). ``inspect.signature`` follows ``__wrapped__`` + # to return the underlying callable's signature. + original.__wrapped__ = original_copy # type: ignore[attr-defined] return original # Fallback for Cython / C builtins or Python functions with closure From 6a1bb45375d19540295d2aa0852815a8e010f9b5 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 11 May 2026 17:18:42 -0400 Subject: [PATCH 15/19] refactor(profiling): compact comments and tighten types in _wrap --- ddtrace/profiling/_asyncio.py | 84 +++++++++++++---------------------- 1 file changed, 32 insertions(+), 52 deletions(-) diff --git a/ddtrace/profiling/_asyncio.py b/ddtrace/profiling/_asyncio.py index fa99a8ba131..b41999f6c4e 100644 --- a/ddtrace/profiling/_asyncio.py +++ b/ddtrace/profiling/_asyncio.py @@ -24,19 +24,19 @@ ASYNCIO_IMPORTED: bool = False -# Wrap registry, looked up at call time by the trampoline's bytecode. -# Key: ``id(code)`` of the trampoline-clone we grafted onto the wrapped -# function. Each wrap site gets a fresh code object via ``code.replace()`` -# so ids are unique. The code object is kept alive by ``original.__code__``. -# Value: (user_wrapper, original_copy) -_ddtrace_wrap_registry: dict[int, tuple[typing.Callable[..., typing.Any], types.FunctionType]] = {} - - -# Template trampolines: their ``__code__`` is what we clone via -# ``CodeType.replace()`` per wrap site and graft onto the original -# function. Each clone has a unique ``id(code)`` which the trampoline -# itself reads via ``sys._getframe(0).f_code`` to look up its wrapper. -# Two templates (sync / async) cover all wrap targets. +_WrapperFn = typing.Callable[ + [types.FunctionType, tuple[typing.Any, ...], dict[str, typing.Any]], + typing.Any, +] + +# Per wrap site: id(grafted code) -> (user wrapper, copy of original). +# Each wrap site has a unique cloned code object so the id is stable. +_ddtrace_wrap_registry: dict[int, tuple[_WrapperFn, types.FunctionType]] = {} + + +# Template trampolines. We clone ``__code__`` per wrap site via +# ``CodeType.replace()`` and graft it onto the original; each clone has a +# unique id which the trampoline reads via ``sys._getframe(0).f_code``. def _ddtrace_trampoline_sync(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: wrapper, original_copy = _ddtrace_wrap_registry[id(sys._getframe(0).f_code)] return wrapper(original_copy, args, kwargs) @@ -50,33 +50,22 @@ async def _ddtrace_trampoline_async(*args: typing.Any, **kwargs: typing.Any) -> def _wrap( owner: typing.Any, name: str, - wrapper: typing.Callable[..., typing.Any], + wrapper: _WrapperFn, aliases: typing.Sequence[tuple[typing.Any, str]] = (), ) -> typing.Callable[..., typing.Any]: """Wrap ``owner.name`` so calls go through ``wrapper(original, args, kwargs)``. - For pure-Python functions (``types.FunctionType``) we clone a - template trampoline's bytecode via ``code.replace()`` and graft it - onto the original function in place. Function identity is preserved, - so pre-existing captured references (e.g. ``from X import Y`` - performed before the profiler starts) still see the wrapped - behaviour — this matches what ``ddtrace.internal.wrapping.wrap`` did - via the ``bytecode`` library, without taking on that dependency. - - For non-Python callables (Cython methods, C builtins) we fall back to - ``setattr`` and mirror onto ``aliases``. ``aliases`` is a no-op on the - identity-preserving path (both alias bindings already point at the - same mutated object) and exists only for the fallback case. + Pure-Python no-closure functions: mutate ``__code__`` in place — preserves + identity, so ``from X import Y`` references captured before profiler start + still see the wrap (the uvloop scenario). Same trick ``bytecode.wrap`` did. + + Everything else (Cython, C builtins, closures-via-``super()``): falls back + to ``setattr`` and mirrors onto ``aliases``. """ - original = getattr(owner, name) + original: typing.Any = getattr(owner, name) if isinstance(original, types.FunctionType) and not original.__closure__: - # Identity-preserving path: mutate __code__ in place. - # We require the function to have no closure cells — the template - # trampoline has none, and ``__code__`` swaps must match free-var - # counts. Class methods using super() (e.g. _GatheringFuture.__init__) - # carry a __class__ closure cell and therefore fall through to setattr. - original_copy = types.FunctionType( + original_copy: types.FunctionType = types.FunctionType( original.__code__, original.__globals__, original.__name__, @@ -85,40 +74,31 @@ def _wrap( ) original_copy.__kwdefaults__ = original.__kwdefaults__ - is_async = inspect.iscoroutinefunction(original) + is_async: bool = inspect.iscoroutinefunction(original) template = _ddtrace_trampoline_async if is_async else _ddtrace_trampoline_sync - # Clone the template's bytecode and stamp original's metadata for - # stack-trace clarity. ``replace()`` always returns a new code - # object, so ``id(new_code)`` is unique per wrap site. - new_code = template.__code__.replace( + # Clone the template's bytecode, stamp original's metadata (each + # ``replace()`` returns a fresh code object → unique id per wrap site). + new_code: types.CodeType = template.__code__.replace( co_filename=original.__code__.co_filename, co_firstlineno=original.__code__.co_firstlineno, co_name=original.__code__.co_name, ) _ddtrace_wrap_registry[id(new_code)] = (wrapper, original_copy) - # The trampoline bytecode does LOAD_GLOBAL on ``_ddtrace_wrap_registry`` - # and ``sys``, resolved against original's module globals at call - # time. Inject both (sys is normally already imported by asyncio - # internals, but ``setdefault`` is harmless and covers exotic - # targets). Idempotent across wrap sites in the same module. + # Trampoline does LOAD_GLOBAL on these names, resolved against the + # original's module globals at call time. Idempotent per module. original.__globals__.setdefault("_ddtrace_wrap_registry", _ddtrace_wrap_registry) original.__globals__.setdefault("sys", sys) original.__code__ = new_code - # Preserve introspection: the trampoline's (*args, **kwargs) shape - # would otherwise leak through to ``inspect.signature(original)`` - # and break callers that adapt to the original signature (FastAPI, - # validators, etc.). ``inspect.signature`` follows ``__wrapped__`` - # to return the underlying callable's signature. + # Make ``inspect.signature(original)`` follow through to the real + # signature instead of the trampoline's ``(*args, **kwargs)``. original.__wrapped__ = original_copy # type: ignore[attr-defined] return original - # Fallback for Cython / C builtins or Python functions with closure - # cells (e.g. class methods using ``super()``). Identity isn't - # preserved here; callers that also need to patch aliased bindings - # must pass them via ``aliases``. + # Fallback path — identity NOT preserved. ``aliases`` mirrors the wrap onto + # re-exported bindings (e.g. asyncio.X aliased to asyncio.tasks.X). @wraps(original) def wrapped(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: return wrapper(original, args, kwargs) From dd99cc354241407e91823d82c8641ca9c9d732f6 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 11 May 2026 17:24:26 -0400 Subject: [PATCH 16/19] test(profiling): finish type annotations on inner async helpers --- .../collector/test_asyncio_wrapping.py | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/tests/profiling/collector/test_asyncio_wrapping.py b/tests/profiling/collector/test_asyncio_wrapping.py index 81a404b9f25..c8c863dfedb 100644 --- a/tests/profiling/collector/test_asyncio_wrapping.py +++ b/tests/profiling/collector/test_asyncio_wrapping.py @@ -17,6 +17,7 @@ import os import sys +from typing import Any import pytest @@ -52,7 +53,7 @@ async def child() -> int: await asyncio.sleep(0) return 1 - async def main(): + async def main() -> tuple[int, int]: t1 = asyncio.ensure_future(child()) t2 = asyncio.ensure_future(child()) await asyncio.gather(t1, t2) @@ -128,7 +129,7 @@ def test_wait_triggers_link_tasks_per_future() -> None: async def child() -> None: await asyncio.sleep(0) - async def main(): + async def main() -> tuple[int, int]: t1 = asyncio.ensure_future(child()) t2 = asyncio.ensure_future(child()) await asyncio.wait([t1, t2]) @@ -349,7 +350,7 @@ async def good() -> int: async def bad() -> int: raise ValueError("boom") - async def main(): + async def main() -> list[Any]: results = await asyncio.gather(good(), bad(), return_exceptions=True) return [type(r).__name__ if isinstance(r, BaseException) else r for r in results] @@ -369,7 +370,7 @@ async def child(x: int) -> int: await asyncio.sleep(0) return x - async def main(): + async def main() -> tuple[int, int]: t1 = asyncio.ensure_future(child(1)) t2 = asyncio.ensure_future(child(2)) done, pending = await asyncio.wait([t1, t2]) @@ -397,7 +398,7 @@ def test_gather_empty_does_not_link() -> None: with started_profiler(), captured_link_calls("link_tasks") as recorded: - async def main(): + async def main() -> list[Any]: return await asyncio.gather() assert asyncio.run(main()) == [] @@ -418,7 +419,7 @@ def test_create_task_propagates_exception() -> None: async def child() -> int: raise RuntimeError("expected boom") - async def main(): + async def main() -> str | None: t = asyncio.create_task(child()) try: await t @@ -454,17 +455,18 @@ async def child_ok() -> int: async def child_bad() -> int: raise ValueError("expected") - async def main(): + async def main() -> list[tuple[str, str]]: try: - async with asyncio.TaskGroup() as tg: + async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined] tg.create_task(child_ok()) tg.create_task(child_bad()) except BaseException as outer: - exc_strs = [] + exc_strs: list[tuple[str, str]] = [] - def collect(e): - if hasattr(e, "exceptions"): - for sub in e.exceptions: + def collect(e: BaseException) -> None: + sub_excs = getattr(e, "exceptions", None) + if sub_excs is not None: + for sub in sub_excs: collect(sub) else: exc_strs.append((type(e).__name__, str(e))) @@ -507,7 +509,7 @@ def test_wrapping_persists_across_profiler_restart() -> None: async def child() -> None: await asyncio.sleep(0) - async def main(): + async def main() -> int: t = asyncio.create_task(child()) await t return id(t) From 6ed5638c892558ea1bb14e676168d73595291370 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 11 May 2026 22:00:18 -0400 Subject: [PATCH 17/19] fix tests --- tests/profiling/collector/test_asyncio_wrapping.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/profiling/collector/test_asyncio_wrapping.py b/tests/profiling/collector/test_asyncio_wrapping.py index c8c863dfedb..9626f538429 100644 --- a/tests/profiling/collector/test_asyncio_wrapping.py +++ b/tests/profiling/collector/test_asyncio_wrapping.py @@ -17,7 +17,6 @@ import os import sys -from typing import Any import pytest @@ -339,6 +338,7 @@ def test_gather_with_return_exceptions_keeps_kwarg() -> None: rather than raising. Verifies that the wrapper doesn't drop the kwarg. """ import asyncio + from typing import Any from tests.profiling.collector._asyncio_wrap_helpers import started_profiler @@ -392,6 +392,7 @@ def test_gather_empty_does_not_link() -> None: fire ``link_tasks`` (no children to link). """ import asyncio + from typing import Any from tests.profiling.collector._asyncio_wrap_helpers import captured_link_calls from tests.profiling.collector._asyncio_wrap_helpers import started_profiler From 70ff900f54db0fc2e02e17125fb30a1c5b37306b Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 11 May 2026 22:18:49 -0400 Subject: [PATCH 18/19] test(profiling): drop __wrapped__ assertion to keep signature test implementation-agnostic --- .../collector/test_asyncio_wrapping.py | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/tests/profiling/collector/test_asyncio_wrapping.py b/tests/profiling/collector/test_asyncio_wrapping.py index 9626f538429..346c60a64c2 100644 --- a/tests/profiling/collector/test_asyncio_wrapping.py +++ b/tests/profiling/collector/test_asyncio_wrapping.py @@ -1,12 +1,15 @@ """Behavioural tests for the asyncio function-wrapping in ``ddtrace/profiling/_asyncio.py``. -These tests exercise the contract that the wrapping must guarantee — alias -identity preservation, function metadata preservation, and that each profiler -callback fires when the corresponding asyncio API is exercised. They are -deliberately implementation-agnostic: they pass against the bytecode-based -``ddtrace.internal.wrapping.wrap`` (main) and the setattr-based local -``_wrap`` helper introduced in this branch. +These tests exercise the user-visible contract the wrap must guarantee — +identity preservation across module aliases, signature preservation, +correct callback firing per asyncio API, and correct argument +pass-through. They assert observable behaviour only, so they pass against +both ``main``'s ``bytecode.wrap`` (which mutates ``__code__`` in place +and stamps arg metadata onto the new code object) and this branch's +``_wrap`` (which clones a template trampoline via ``code.replace()`` and +sets ``__wrapped__``). Whether either approach is in tree, the contract +is the same. Each test runs in its own subprocess (via ``@pytest.mark.subprocess``) because the wrapping mutates global asyncio state and cannot be safely @@ -201,9 +204,10 @@ def test_set_event_loop_triggers_track_asyncio_loop() -> None: @pytest.mark.subprocess(err=None) def test_wrap_preserves_inspect_signature() -> None: """``inspect.signature(asyncio.tasks.create_task)`` after profiler start - must match the unwrapped signature. The trampoline carries a - ``(*args, **kwargs)`` shape; ``__wrapped__`` set on the original lets - ``inspect.signature`` recover the real argument metadata. + must match the unwrapped signature. Implementations can satisfy this + by either stamping arg metadata onto the trampoline code (``main``'s + ``bytecode.wrap``) or by setting ``__wrapped__`` on the original + (this branch). We only assert the user-visible property. """ import asyncio import inspect @@ -215,9 +219,6 @@ def test_wrap_preserves_inspect_signature() -> None: with started_profiler(): post_sig = inspect.signature(asyncio.tasks.create_task) assert str(post_sig) == str(pre_sig), f"signature regressed under wrap: {pre_sig} -> {post_sig}" - assert hasattr(asyncio.tasks.create_task, "__wrapped__"), ( - "__wrapped__ must be set so inspect.signature() can recover the original signature" - ) # --------------------------------------------------------------------------- From 7c9f4533e75db6e073649a6e03c3f80efe6728fc Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Tue, 12 May 2026 12:18:40 -0400 Subject: [PATCH 19/19] refactor(profiling): extract _wrap helpers into ddtrace.profiling._wrap module --- ddtrace/profiling/_asyncio.py | 89 +--------------------------- ddtrace/profiling/_wrap.py | 108 ++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 88 deletions(-) create mode 100644 ddtrace/profiling/_wrap.py diff --git a/ddtrace/profiling/_asyncio.py b/ddtrace/profiling/_asyncio.py index b41999f6c4e..6879afbddf1 100644 --- a/ddtrace/profiling/_asyncio.py +++ b/ddtrace/profiling/_asyncio.py @@ -2,10 +2,7 @@ from __future__ import annotations from functools import partial -from functools import wraps -import inspect import sys -import types from types import ModuleType import typing @@ -19,96 +16,12 @@ from ddtrace.internal.module import ModuleWatchdog from ddtrace.internal.settings.profiling import config from ddtrace.internal.utils import get_argument_value +from ddtrace.profiling._wrap import wrap as _wrap ASYNCIO_IMPORTED: bool = False -_WrapperFn = typing.Callable[ - [types.FunctionType, tuple[typing.Any, ...], dict[str, typing.Any]], - typing.Any, -] - -# Per wrap site: id(grafted code) -> (user wrapper, copy of original). -# Each wrap site has a unique cloned code object so the id is stable. -_ddtrace_wrap_registry: dict[int, tuple[_WrapperFn, types.FunctionType]] = {} - - -# Template trampolines. We clone ``__code__`` per wrap site via -# ``CodeType.replace()`` and graft it onto the original; each clone has a -# unique id which the trampoline reads via ``sys._getframe(0).f_code``. -def _ddtrace_trampoline_sync(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: - wrapper, original_copy = _ddtrace_wrap_registry[id(sys._getframe(0).f_code)] - return wrapper(original_copy, args, kwargs) - - -async def _ddtrace_trampoline_async(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: - wrapper, original_copy = _ddtrace_wrap_registry[id(sys._getframe(0).f_code)] - return await wrapper(original_copy, args, kwargs) - - -def _wrap( - owner: typing.Any, - name: str, - wrapper: _WrapperFn, - aliases: typing.Sequence[tuple[typing.Any, str]] = (), -) -> typing.Callable[..., typing.Any]: - """Wrap ``owner.name`` so calls go through ``wrapper(original, args, kwargs)``. - - Pure-Python no-closure functions: mutate ``__code__`` in place — preserves - identity, so ``from X import Y`` references captured before profiler start - still see the wrap (the uvloop scenario). Same trick ``bytecode.wrap`` did. - - Everything else (Cython, C builtins, closures-via-``super()``): falls back - to ``setattr`` and mirrors onto ``aliases``. - """ - original: typing.Any = getattr(owner, name) - - if isinstance(original, types.FunctionType) and not original.__closure__: - original_copy: types.FunctionType = types.FunctionType( - original.__code__, - original.__globals__, - original.__name__, - original.__defaults__, - original.__closure__, - ) - original_copy.__kwdefaults__ = original.__kwdefaults__ - - is_async: bool = inspect.iscoroutinefunction(original) - template = _ddtrace_trampoline_async if is_async else _ddtrace_trampoline_sync - - # Clone the template's bytecode, stamp original's metadata (each - # ``replace()`` returns a fresh code object → unique id per wrap site). - new_code: types.CodeType = template.__code__.replace( - co_filename=original.__code__.co_filename, - co_firstlineno=original.__code__.co_firstlineno, - co_name=original.__code__.co_name, - ) - _ddtrace_wrap_registry[id(new_code)] = (wrapper, original_copy) - - # Trampoline does LOAD_GLOBAL on these names, resolved against the - # original's module globals at call time. Idempotent per module. - original.__globals__.setdefault("_ddtrace_wrap_registry", _ddtrace_wrap_registry) - original.__globals__.setdefault("sys", sys) - - original.__code__ = new_code - # Make ``inspect.signature(original)`` follow through to the real - # signature instead of the trampoline's ``(*args, **kwargs)``. - original.__wrapped__ = original_copy # type: ignore[attr-defined] - return original - - # Fallback path — identity NOT preserved. ``aliases`` mirrors the wrap onto - # re-exported bindings (e.g. asyncio.X aliased to asyncio.tasks.X). - @wraps(original) - def wrapped(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: - return wrapper(original, args, kwargs) - - setattr(owner, name, wrapped) - for alias_owner, alias_name in aliases: - setattr(alias_owner, alias_name, wrapped) - return wrapped - - def current_task() -> typing.Optional[asyncio.Task[typing.Any]]: return None diff --git a/ddtrace/profiling/_wrap.py b/ddtrace/profiling/_wrap.py new file mode 100644 index 00000000000..3b672a90ce3 --- /dev/null +++ b/ddtrace/profiling/_wrap.py @@ -0,0 +1,108 @@ +# -*- encoding: utf-8 -*- +"""Identity-preserving function-wrap helper for the profiling internals. + +``_wrap(owner, name, wrapper)`` mutates ``owner.name``'s ``__code__`` in +place via ``CodeType.replace()`` to redirect every call through +``wrapper(original, args, kwargs)``. Function identity is preserved, so +references captured before the wrap was installed (e.g. ``from X import +Y`` performed at another module's import time — see uvloop) still go +through the wrap. This matches the contract of +``ddtrace.internal.wrapping.wrap`` (which uses the ``bytecode`` lib) +without taking on that dependency. + +Currently used only by ``ddtrace.profiling._asyncio``; other profiling +modules can import from here if they need the same behaviour. +""" + +from __future__ import annotations + +from functools import wraps +import inspect +import sys +import types +import typing + + +_WrapperFn = typing.Callable[ + [types.FunctionType, tuple[typing.Any, ...], dict[str, typing.Any]], + typing.Any, +] + +# Per wrap site: id(grafted code) -> (user wrapper, copy of original). +# Each wrap site has a unique cloned code object so the id is stable. +_ddtrace_wrap_registry: dict[int, tuple[_WrapperFn, types.FunctionType]] = {} + + +# Template trampolines. We clone ``__code__`` per wrap site via +# ``CodeType.replace()`` and graft it onto the original; each clone has a +# unique id which the trampoline reads via ``sys._getframe(0).f_code``. +def _ddtrace_trampoline_sync(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: + wrapper, original_copy = _ddtrace_wrap_registry[id(sys._getframe(0).f_code)] + return wrapper(original_copy, args, kwargs) + + +async def _ddtrace_trampoline_async(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: + wrapper, original_copy = _ddtrace_wrap_registry[id(sys._getframe(0).f_code)] + return await wrapper(original_copy, args, kwargs) + + +def wrap( + owner: typing.Any, + name: str, + wrapper: _WrapperFn, + aliases: typing.Sequence[tuple[typing.Any, str]] = (), +) -> typing.Callable[..., typing.Any]: + """Wrap ``owner.name`` so calls go through ``wrapper(original, args, kwargs)``. + + Pure-Python no-closure functions: mutate ``__code__`` in place — preserves + identity, so ``from X import Y`` references captured before profiler start + still see the wrap (the uvloop scenario). Same trick ``bytecode.wrap`` did. + + Everything else (Cython, C builtins, closures-via-``super()``): falls back + to ``setattr`` and mirrors onto ``aliases``. + """ + original: typing.Any = getattr(owner, name) + + if isinstance(original, types.FunctionType) and not original.__closure__: + original_copy: types.FunctionType = types.FunctionType( + original.__code__, + original.__globals__, + original.__name__, + original.__defaults__, + original.__closure__, + ) + original_copy.__kwdefaults__ = original.__kwdefaults__ + + is_async: bool = inspect.iscoroutinefunction(original) + template = _ddtrace_trampoline_async if is_async else _ddtrace_trampoline_sync + + # Clone the template's bytecode, stamp original's metadata (each + # ``replace()`` returns a fresh code object → unique id per wrap site). + new_code: types.CodeType = template.__code__.replace( + co_filename=original.__code__.co_filename, + co_firstlineno=original.__code__.co_firstlineno, + co_name=original.__code__.co_name, + ) + _ddtrace_wrap_registry[id(new_code)] = (wrapper, original_copy) + + # Trampoline does LOAD_GLOBAL on these names, resolved against the + # original's module globals at call time. Idempotent per module. + original.__globals__.setdefault("_ddtrace_wrap_registry", _ddtrace_wrap_registry) + original.__globals__.setdefault("sys", sys) + + original.__code__ = new_code + # Make ``inspect.signature(original)`` follow through to the real + # signature instead of the trampoline's ``(*args, **kwargs)``. + original.__wrapped__ = original_copy # type: ignore[attr-defined] + return original + + # Fallback path — identity NOT preserved. ``aliases`` mirrors the wrap onto + # re-exported bindings (e.g. asyncio.X aliased to asyncio.tasks.X). + @wraps(original) + def wrapped(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: + return wrapper(original, args, kwargs) + + setattr(owner, name, wrapped) + for alias_owner, alias_name in aliases: + setattr(alias_owner, alias_name, wrapped) + return wrapped