Skip to content

AsyncIO: avoid holding exceptions and traceback#806

Open
frankie567 wants to merge 1 commit into
Bogdanp:masterfrom
frankie567:memory-leak-asyncio-ter
Open

AsyncIO: avoid holding exceptions and traceback#806
frankie567 wants to merge 1 commit into
Bogdanp:masterfrom
frankie567:memory-leak-asyncio-ter

Conversation

@frankie567
Copy link
Copy Markdown

@frankie567 frankie567 commented Dec 17, 2025

More complete and reasonable fix for the problem I described in #802.

Problem description

In situations with lot of tasks being executed, and some of them raising exceptions with large tracebacks, we might encounter huge spikes in memory consumption that might lead to OOM issues. Mainly, this is because the garbage collector may not run timely enough under high pressure. In a sense, this is similar to the old issue #351.

The problem has several sources:

  1. The coroutine retains its exception context until it's garbage collected:

async def wrapped_coro() -> R:
try:
return await coro
finally:
done.set()

  1. The future retains the exception context and traceback raised by the coroutine until it's garbage collected:

future = asyncio.run_coroutine_threadsafe(wrapped_coro(), self.loop)
try:
while True:
try:
# Use a timeout to be able to catch asynchronously
# raised dramatiq exceptions (Interrupt).
return future.result(timeout=self.interrupt_check_ival)

Proposed solution

The solution is to make sure to detach the exception context from the coroutine and future so we can manually manage its lifecycle and force garbage collection. Basically, we use local variables to store the result and exception of the coroutine; instead of relying on future.result() directly.

I also added a unit test for this. Admittedly, it's a bit complex since it's hard to replicate this behaviour inside pytest with the stub broker.

Side note

The happy side-effect of this change is that it makes the TimeoutError fix (#791) a bit more readable, since we directly handle the TimeoutError raised inside the coroutine.

@LincolnPuzey
Copy link
Copy Markdown
Collaborator

  1. The coroutine retains its exception context until it's garbage collected:
  1. The future retains the exception context and traceback raised by the coroutine until it's garbage collected:

Is there actually a memory leak? Or is garbage collection just happening too infrequently?

@frankie567
Copy link
Copy Markdown
Author

I'm not sure. I've ran my test script in two scenarios: high pressure (one task every 100 ms) and low pressure (one task every 1 second). Here are the results:

High pressure

Figure_fast

Low pressure

Figure_slow

In both cases, we see the memory grow rapidly but stabilises at some point (but at a very high value). In the low pressure scenario, we do see some drops which shows that we free up memory at some point, but we're still on that "plateau". My reading is that the EventLoopThread might retain the objects from the last run until a new run overrides them.


With the fix in this PR, here is the memory consumption with the same script (one task every 100 ms):

Figure_fast_fix

@frankie567 frankie567 force-pushed the memory-leak-asyncio-ter branch 2 times, most recently from 8c2e1d0 to f8f75c3 Compare January 16, 2026 15:26
@frankie567
Copy link
Copy Markdown
Author

I rebased and fixed the linting errors. What else can I do to push this forward? 🙂

@LincolnPuzey
Copy link
Copy Markdown
Collaborator

Thanks @frankie567. I think I want to understand what is going on a bit deeper before making a change.
If you can share your scripts so I can try re-creating those results that would help.

@frankie567
Copy link
Copy Markdown
Author

Sure, here it is:

import os
import time

import dramatiq
import psutil
from dramatiq.brokers.redis import RedisBroker
from dramatiq.middleware.asyncio import AsyncIO

MAX_BACKOFF = 100 # Adjust the pressure with this constant
MEMORY_LOG_FILE = "memory_usage.csv"

broker = RedisBroker(url="redis://localhost:6379/0")
dramatiq.set_broker(broker)
broker.add_middleware(AsyncIO())


class BigException(Exception):
    def __init__(self, a: bytes) -> None:
        self.a = a
        super().__init__("Big exception")


def log_memory(label: str = "") -> None:
    process = psutil.Process()
    memory_info = process.memory_info()
    timestamp = time.time()
    memory_mb = memory_info.rss / 1024 / 1024

    file_exists = os.path.exists(MEMORY_LOG_FILE)
    with open(MEMORY_LOG_FILE, "a") as f:
        if not file_exists:
            f.write("timestamp,memory_mb,label\n")
        f.write(f"{timestamp},{memory_mb:.2f},{label}\n")


@dramatiq.actor(actor_name="oom_task", max_retries=1_000_000, max_backoff= MAX_BACKOFF)
async def oom_task() -> None:
    log_memory("before_alloc")
    a = bytes(bytearray(128 * 1024 * 1024))
    log_memory("after_alloc")
    raise BigException(a)


if __name__ == "__main__":
    oom_task.send()

One-liner to plot the graph:

uv run --with pandas --with matplotlib python -c "import pandas as pd; import matplotlib.pyplot as plt; df = pd.read_csv('memory_usage.csv'); plt.plot(df['timestamp'] - df['timestamp'].iloc[0], df['memory_mb'], marker='o'); plt.xlabel('Time (s)'); plt.ylabel('Memory (MB)'); plt.title('Memory Usage'); plt.show()"

@frankie567 frankie567 force-pushed the memory-leak-asyncio-ter branch from f8f75c3 to 63ea404 Compare January 19, 2026 13:38
Copy link
Copy Markdown

@themavik themavik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Routing failures through exception_container and re-raising with cleared traceback refs matches the weakref regression goal. nit: import gc is unused in the diff—drop it unless a follow-up commit uses it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants