Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
14137b7
rewrite tracing's 'wrap' that's depending on 'bytecode' lib with a lo…
vlad-scherbich May 9, 2026
2327302
added new unit tests
vlad-scherbich May 10, 2026
a452054
fix tests
vlad-scherbich May 11, 2026
01b4bd6
fix(profiling): preserve coroutine semantics when wrapping async-def …
vlad-scherbich May 11, 2026
b21e146
fix(profiling): preserve function identity in _wrap to instrument pre…
vlad-scherbich May 11, 2026
672f9ce
test: replace weak module-import sanity check with deterministic iden…
vlad-scherbich May 11, 2026
1fd7eb3
test: add behavioural test for pre-cached create_task reference (uvlo…
vlad-scherbich May 11, 2026
6a01376
test: drop overlapping asyncio-wrapping tests (25 -> 15)
vlad-scherbich May 11, 2026
2520510
test: extract profiler-setup boilerplate into _asyncio_wrap_helpers
vlad-scherbich May 11, 2026
1ae2a9c
refactor(profiling): replace exec-based trampoline with code.replace(…
vlad-scherbich May 11, 2026
d1ed59c
fix(profiling): preserve signature metadata via __wrapped__ in identi…
vlad-scherbich May 11, 2026
94ffa0f
test(profiling): cover inspect.signature preservation; assert stack.i…
vlad-scherbich May 11, 2026
348bb66
use template.__code__.replace
vlad-scherbich May 11, 2026
f964247
refactor(profiling): collapse trampoline+dispatcher into one function…
vlad-scherbich May 11, 2026
019a670
refactor(profiling): compact comments and tighten types in _wrap
vlad-scherbich May 11, 2026
ebb1828
test(profiling): finish type annotations on inner async helpers
vlad-scherbich May 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 112 additions & 10 deletions ddtrace/profiling/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
from __future__ import annotations

from functools import partial
from functools import wraps
import inspect
import sys
import types
from types import ModuleType
import typing

Expand All @@ -16,12 +19,96 @@
from ddtrace.internal.module import ModuleWatchdog
from ddtrace.internal.settings.profiling import config
from ddtrace.internal.utils import get_argument_value
from ddtrace.internal.wrapping import wrap


ASYNCIO_IMPORTED: bool = False


_WrapperFn = typing.Callable[
[types.FunctionType, tuple[typing.Any, ...], dict[str, typing.Any]],
typing.Any,
]

# Per wrap site: id(grafted code) -> (user wrapper, copy of original).
# Each wrap site has a unique cloned code object so the id is stable.
_ddtrace_wrap_registry: dict[int, tuple[_WrapperFn, types.FunctionType]] = {}


# Template trampolines. We clone ``__code__`` per wrap site via
# ``CodeType.replace()`` and graft it onto the original; each clone has a
# unique id which the trampoline reads via ``sys._getframe(0).f_code``.
def _ddtrace_trampoline_sync(*args: typing.Any, **kwargs: typing.Any) -> typing.Any:
wrapper, original_copy = _ddtrace_wrap_registry[id(sys._getframe(0).f_code)]
return wrapper(original_copy, args, kwargs)


async def _ddtrace_trampoline_async(*args: typing.Any, **kwargs: typing.Any) -> typing.Any:
wrapper, original_copy = _ddtrace_wrap_registry[id(sys._getframe(0).f_code)]
return await wrapper(original_copy, args, kwargs)


def _wrap(
owner: typing.Any,
name: str,
wrapper: _WrapperFn,
aliases: typing.Sequence[tuple[typing.Any, str]] = (),
) -> typing.Callable[..., typing.Any]:
"""Wrap ``owner.name`` so calls go through ``wrapper(original, args, kwargs)``.

Pure-Python no-closure functions: mutate ``__code__`` in place — preserves
identity, so ``from X import Y`` references captured before profiler start
still see the wrap (the uvloop scenario). Same trick ``bytecode.wrap`` did.

Everything else (Cython, C builtins, closures-via-``super()``): falls back
to ``setattr`` and mirrors onto ``aliases``.
"""
original: typing.Any = getattr(owner, name)

if isinstance(original, types.FunctionType) and not original.__closure__:
original_copy: types.FunctionType = types.FunctionType(
original.__code__,
original.__globals__,
original.__name__,
original.__defaults__,
original.__closure__,
)
original_copy.__kwdefaults__ = original.__kwdefaults__

is_async: bool = inspect.iscoroutinefunction(original)
template = _ddtrace_trampoline_async if is_async else _ddtrace_trampoline_sync

# Clone the template's bytecode, stamp original's metadata (each
# ``replace()`` returns a fresh code object → unique id per wrap site).
new_code: types.CodeType = template.__code__.replace(
co_filename=original.__code__.co_filename,
co_firstlineno=original.__code__.co_firstlineno,
co_name=original.__code__.co_name,
)
_ddtrace_wrap_registry[id(new_code)] = (wrapper, original_copy)

# Trampoline does LOAD_GLOBAL on these names, resolved against the
# original's module globals at call time. Idempotent per module.
original.__globals__.setdefault("_ddtrace_wrap_registry", _ddtrace_wrap_registry)
original.__globals__.setdefault("sys", sys)

original.__code__ = new_code
# Make ``inspect.signature(original)`` follow through to the real
# signature instead of the trampoline's ``(*args, **kwargs)``.
original.__wrapped__ = original_copy # type: ignore[attr-defined]
return original

# Fallback path — identity NOT preserved. ``aliases`` mirrors the wrap onto
# re-exported bindings (e.g. asyncio.X aliased to asyncio.tasks.X).
@wraps(original)
def wrapped(*args: typing.Any, **kwargs: typing.Any) -> typing.Any:
return wrapper(original, args, kwargs)

setattr(owner, name, wrapped)
for alias_owner, alias_name in aliases:
setattr(alias_owner, alias_name, wrapped)
return wrapped
Comment thread
vlad-scherbich marked this conversation as resolved.


def current_task() -> typing.Optional[asyncio.Task[typing.Any]]:
return None

Expand Down Expand Up @@ -104,7 +191,7 @@ def _get_running_loop() -> typing.Optional[aio.AbstractEventLoop]:

if policy_class is not None:

@partial(wrap, policy_class.set_event_loop) # pyright: ignore[reportArgumentType]
@partial(_wrap, policy_class, "set_event_loop") # pyright: ignore[reportArgumentType]
def _(
f: typing.Callable[[object, typing.Optional[aio.AbstractEventLoop]], None],
args: typing.Any,
Expand All @@ -117,7 +204,7 @@ def _(

if init_stack:

@partial(wrap, sys.modules["asyncio"].tasks._GatheringFuture.__init__)
@partial(_wrap, sys.modules["asyncio"].tasks._GatheringFuture, "__init__")
def _(f: typing.Callable[..., None], args: tuple[typing.Any, ...], kwargs: dict[str, typing.Any]) -> None:
try:
return f(*args, **kwargs)
Expand All @@ -133,7 +220,7 @@ def _(f: typing.Callable[..., None], args: tuple[typing.Any, ...], kwargs: dict[
for child in children:
stack.link_tasks(parent, child)

@partial(wrap, sys.modules["asyncio"].tasks._wait)
@partial(_wrap, sys.modules["asyncio"].tasks, "_wait")
def _(
f: typing.Callable[..., tuple[set[aio.Future[typing.Any]], set[aio.Future[typing.Any]]]],
args: tuple[typing.Any, ...],
Expand All @@ -149,7 +236,12 @@ def _(
for future in futures:
stack.link_tasks(parent, future)

@partial(wrap, sys.modules["asyncio"].tasks.as_completed)
@partial(
_wrap,
sys.modules["asyncio"].tasks,
"as_completed",
aliases=[(sys.modules["asyncio"], "as_completed")],
)
def _(
f: typing.Callable[..., typing.Generator[aio.Future[typing.Any], typing.Any, None]],
args: tuple[typing.Any, ...],
Expand Down Expand Up @@ -177,7 +269,12 @@ def _(
return f(*args, **kwargs)

# Wrap asyncio.shield to link parent task to shielded future
@partial(wrap, sys.modules["asyncio"].tasks.shield)
@partial(
_wrap,
sys.modules["asyncio"].tasks,
"shield",
aliases=[(sys.modules["asyncio"], "shield")],
)
def _(
f: typing.Callable[..., aio.Future[typing.Any]],
args: tuple[typing.Any, ...],
Expand Down Expand Up @@ -208,7 +305,7 @@ def _(
taskgroup_class: typing.Optional[type[typing.Any]] = getattr(taskgroups_module, "TaskGroup", None)
if taskgroup_class is not None and hasattr(taskgroup_class, "create_task"):

@partial(wrap, taskgroup_class.create_task)
@partial(_wrap, taskgroup_class, "create_task")
def _(
f: typing.Callable[..., aio.Task[typing.Any]],
args: tuple[typing.Any, ...],
Expand All @@ -228,7 +325,12 @@ def _(
# if it times out. The timeout._task is the same as the current task, so there's
# no parent-child relationship to link. The timeout mechanism is handled by the
# event loop's timeout handler, not by creating new tasks.
@partial(wrap, sys.modules["asyncio"].tasks.create_task)
@partial(
_wrap,
sys.modules["asyncio"].tasks,
"create_task",
aliases=[(sys.modules["asyncio"], "create_task")],
)
def _(
f: typing.Callable[..., aio.Task[typing.Any]],
args: tuple[typing.Any, ...],
Expand Down Expand Up @@ -271,7 +373,7 @@ def _(uvloop: ModuleType) -> None:
)
if new_event_loop_func is not None:

@partial(wrap, new_event_loop_func) # type: ignore[arg-type]
@partial(_wrap, uvloop, "new_event_loop")
def _(
f: typing.Callable[[], asyncio.AbstractEventLoop],
args: tuple[typing.Any, ...],
Expand All @@ -292,7 +394,7 @@ def _(
policy_class: typing.Optional[type[typing.Any]] = getattr(uvloop, "EventLoopPolicy", None)
if policy_class is not None and hasattr(policy_class, "set_event_loop"):

@partial(wrap, policy_class.set_event_loop) # pyright: ignore[reportArgumentType]
@partial(_wrap, policy_class, "set_event_loop") # pyright: ignore[reportArgumentType]
def _(
f: typing.Callable[[object, typing.Optional[asyncio.AbstractEventLoop]], None],
args: typing.Any,
Expand Down
65 changes: 65 additions & 0 deletions tests/profiling/collector/_asyncio_wrap_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Subprocess-body helpers for the asyncio-wrap tests.

``@pytest.mark.subprocess`` extracts only the test function's body and
executes it as a standalone script (see ``FunctionDefFinder`` in
``tests/conftest.py``). Module-level definitions in the test file are
therefore invisible to the subprocess. Helpers must live in a separate
importable module — which this file is.

Imported as ``from tests.profiling.collector._asyncio_wrap_helpers import ...``
inside each subprocess test body.
"""

from __future__ import annotations

from contextlib import contextmanager
from typing import Any
from typing import Callable
from typing import Iterator


@contextmanager
def started_profiler() -> Iterator[Any]:
"""Context manager that starts the profiler on entry and stops it on
exit, even on assertion failure. Yields the Profiler instance.
"""
from ddtrace.profiling import profiler

p = profiler.Profiler()
p.start()
try:
yield p
finally:
p.stop()


@contextmanager
def captured_link_calls(attr: str) -> Iterator[list[int]]:
"""Replace ``ddtrace.internal.datadog.profiling.stack.<attr>`` with a
recorder that captures the child task id of each call, yielding the
growing list. Restores the original on exit.

``attr`` is one of ``"link_tasks"`` / ``"weak_link_tasks"`` /
``"track_asyncio_loop"`` — anything with a ``(_, second_arg)`` shape
where the second arg is the thing we want to identify by id.

Raises ``AssertionError`` (with ``stack.failure_msg``) if the native
stack extension is unavailable — surfaces the root cause clearly
rather than an opaque ``AttributeError`` on ``getattr(stack, attr)``.
"""
from ddtrace.internal.datadog.profiling import stack

assert stack.is_available, stack.failure_msg

original: Callable[..., Any] = getattr(stack, attr)
recorded: list[int] = []
Comment thread
vlad-scherbich marked this conversation as resolved.

def recorder(first: Any, second: Any) -> Any:
recorded.append(id(second))
return original(first, second)

setattr(stack, attr, recorder)
try:
yield recorded
finally:
setattr(stack, attr, original)
Loading
Loading