AsyncIO: avoid holding exceptions and traceback#806
Open
frankie567 wants to merge 1 commit into
Open
Conversation
Collaborator
Is there actually a memory leak? Or is garbage collection just happening too infrequently? |
Author
8c2e1d0 to
f8f75c3
Compare
Author
|
I rebased and fixed the linting errors. What else can I do to push this forward? 🙂 |
Collaborator
|
Thanks @frankie567. I think I want to understand what is going on a bit deeper before making a change. |
Author
|
Sure, here it is: import os
import time
import dramatiq
import psutil
from dramatiq.brokers.redis import RedisBroker
from dramatiq.middleware.asyncio import AsyncIO
MAX_BACKOFF = 100 # Adjust the pressure with this constant
MEMORY_LOG_FILE = "memory_usage.csv"
broker = RedisBroker(url="redis://localhost:6379/0")
dramatiq.set_broker(broker)
broker.add_middleware(AsyncIO())
class BigException(Exception):
def __init__(self, a: bytes) -> None:
self.a = a
super().__init__("Big exception")
def log_memory(label: str = "") -> None:
process = psutil.Process()
memory_info = process.memory_info()
timestamp = time.time()
memory_mb = memory_info.rss / 1024 / 1024
file_exists = os.path.exists(MEMORY_LOG_FILE)
with open(MEMORY_LOG_FILE, "a") as f:
if not file_exists:
f.write("timestamp,memory_mb,label\n")
f.write(f"{timestamp},{memory_mb:.2f},{label}\n")
@dramatiq.actor(actor_name="oom_task", max_retries=1_000_000, max_backoff= MAX_BACKOFF)
async def oom_task() -> None:
log_memory("before_alloc")
a = bytes(bytearray(128 * 1024 * 1024))
log_memory("after_alloc")
raise BigException(a)
if __name__ == "__main__":
oom_task.send()One-liner to plot the graph: uv run --with pandas --with matplotlib python -c "import pandas as pd; import matplotlib.pyplot as plt; df = pd.read_csv('memory_usage.csv'); plt.plot(df['timestamp'] - df['timestamp'].iloc[0], df['memory_mb'], marker='o'); plt.xlabel('Time (s)'); plt.ylabel('Memory (MB)'); plt.title('Memory Usage'); plt.show()" |
f8f75c3 to
63ea404
Compare
themavik
reviewed
Mar 23, 2026
themavik
left a comment
There was a problem hiding this comment.
Routing failures through exception_container and re-raising with cleared traceback refs matches the weakref regression goal. nit: import gc is unused in the diff—drop it unless a follow-up commit uses it.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.



More complete and reasonable fix for the problem I described in #802.
Problem description
In situations with lot of tasks being executed, and some of them raising exceptions with large tracebacks, we might encounter huge spikes in memory consumption that might lead to OOM issues. Mainly, this is because the garbage collector may not run timely enough under high pressure. In a sense, this is similar to the old issue #351.
The problem has several sources:
dramatiq/dramatiq/asyncio.py
Lines 137 to 141 in aa91cdf
dramatiq/dramatiq/asyncio.py
Lines 143 to 149 in aa91cdf
Proposed solution
The solution is to make sure to detach the exception context from the coroutine and future so we can manually manage its lifecycle and force garbage collection. Basically, we use local variables to store the result and exception of the coroutine; instead of relying on
future.result()directly.I also added a unit test for this. Admittedly, it's a bit complex since it's hard to replicate this behaviour inside pytest with the stub broker.
Side note
The happy side-effect of this change is that it makes the TimeoutError fix (#791) a bit more readable, since we directly handle the
TimeoutErrorraised inside the coroutine.