Skip to content

Commit 5b536c7

Browse files
Copilotlstein
andcommitted
Fix: shut down asyncio executor on KeyboardInterrupt to prevent post-generation hang
Co-authored-by: lstein <111189+lstein@users.noreply.github.com> Fix: cancel pending asyncio tasks before loop.close() to suppress destroyed-task warnings Co-authored-by: lstein <111189+lstein@users.noreply.github.com> Fix: suppress stack trace when dispatching events after event loop is closed on shutdown Co-authored-by: lstein <111189+lstein@users.noreply.github.com> Fix: cancel in-progress generation on stop() to prevent core dump during mid-flight Ctrl+C Co-authored-by: lstein <111189+lstein@users.noreply.github.com>
1 parent 0cbe7c7 commit 5b536c7

4 files changed

Lines changed: 190 additions & 0 deletions

File tree

invokeai/app/run_app.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ def get_app():
99

1010
def run_app() -> None:
1111
"""The main entrypoint for the app."""
12+
import asyncio
13+
import sys
14+
import threading
15+
import traceback
16+
1217
from invokeai.frontend.cli.arg_parser import InvokeAIArgs
1318

1419
# Parse the CLI arguments before doing anything else, which ensures CLI args correctly override settings from other
@@ -109,3 +114,32 @@ def run_app() -> None:
109114
from invokeai.app.api.dependencies import ApiDependencies
110115

111116
ApiDependencies.shutdown()
117+
118+
# Cancel any pending asyncio tasks (e.g. socket.io ping tasks) so that loop.close() does
119+
# not emit "Task was destroyed but it is pending!" warnings for each one.
120+
pending = [t for t in asyncio.all_tasks(loop) if not t.done()]
121+
for task in pending:
122+
task.cancel()
123+
if pending:
124+
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
125+
126+
# Shut down the asyncio default thread executor. asyncio.to_thread() (used e.g. in the
127+
# session queue for SQLite operations during generation) creates non-daemon threads via the
128+
# event loop's default ThreadPoolExecutor. Without this call those threads remain alive and
129+
# cause threading._shutdown() to hang indefinitely after the process's main code finishes.
130+
loop.run_until_complete(loop.shutdown_default_executor())
131+
loop.close()
132+
133+
# After graceful shutdown, log any non-daemon threads that are still alive. These are the
134+
# threads that will cause Python's threading._shutdown() to block, preventing the process
135+
# from exiting cleanly. This helps identify threads that need to be fixed or joined.
136+
frames = sys._current_frames()
137+
for thread in threading.enumerate():
138+
if thread.daemon or thread is threading.main_thread():
139+
continue
140+
frame = frames.get(thread.ident)
141+
stack = "".join(traceback.format_stack(frame)) if frame else "(no frame available)"
142+
logger.warning(
143+
f"Non-daemon thread still alive after shutdown: {thread.name!r} "
144+
f"(ident={thread.ident})\nStack trace:\n{stack}"
145+
)

invokeai/app/services/events/events_fastapievents.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ def stop(self, *args, **kwargs):
2828
self._loop.call_soon_threadsafe(self._queue.put_nowait, None)
2929

3030
def dispatch(self, event: EventBase) -> None:
31+
if self._loop.is_closed():
32+
# The event loop was closed during shutdown. Events can no longer be dispatched;
33+
# silently drop this one so the generation thread can wind down cleanly.
34+
return
3135
self._loop.call_soon_threadsafe(self._queue.put_nowait, event)
3236

3337
async def _dispatch_from_queue(self, stop_event: threading.Event):

invokeai/app/services/session_processor/session_processor_default.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,11 @@ def start(self, invoker: Invoker) -> None:
367367

368368
def stop(self, *args, **kwargs) -> None:
369369
self._stop_event.set()
370+
# Cancel any in-progress generation so that long-running nodes (e.g. denoising) stop at
371+
# the next step boundary instead of running to completion. Without this, the generation
372+
# thread may still be executing CUDA operations when Python teardown begins, which can
373+
# cause a C++ std::terminate() crash ("terminate called without an active exception").
374+
self._cancel_event.set()
370375
# Wake the thread if it is sleeping in poll_now_event.wait() or blocked in resume_event.wait() (paused).
371376
self._poll_now_event.set()
372377
self._resume_event.set()

tests/test_asyncio_shutdown.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
"""
2+
Tests that verify the fix for the two-Ctrl+C shutdown hang.
3+
4+
Root cause: asyncio.to_thread() (used during generation for SQLite session queue operations)
5+
creates non-daemon threads via the event loop's default ThreadPoolExecutor. When the event
6+
loop is interrupted by KeyboardInterrupt without calling loop.shutdown_default_executor() and
7+
loop.close(), those non-daemon threads remain alive and cause threading._shutdown() to block.
8+
9+
The fix in run_app.py:
10+
1. Cancels all pending asyncio tasks (e.g. socket.io ping tasks) to avoid "Task was destroyed
11+
but it is pending!" warnings when loop.close() is called.
12+
2. Calls loop.run_until_complete(loop.shutdown_default_executor()) followed by loop.close()
13+
after ApiDependencies.shutdown(), so all executor threads are cleaned up before the process
14+
begins its Python-level teardown.
15+
"""
16+
17+
from tests.dangerously_run_function_in_subprocess import dangerously_run_function_in_subprocess
18+
19+
20+
def test_asyncio_to_thread_creates_nondaemon_thread():
21+
"""Confirm that asyncio.to_thread() leaves a non-daemon thread alive after run_until_complete()
22+
is interrupted - this is the raw symptom that caused the two-Ctrl+C hang."""
23+
24+
def test_func():
25+
import asyncio
26+
import threading
27+
28+
async def use_thread():
29+
await asyncio.to_thread(lambda: None)
30+
31+
loop = asyncio.new_event_loop()
32+
loop.run_until_complete(use_thread())
33+
# Deliberately do NOT call shutdown_default_executor() or loop.close()
34+
non_daemon = [t for t in threading.enumerate() if not t.daemon and t is not threading.main_thread()]
35+
# There should be at least one non-daemon executor thread still alive
36+
if not non_daemon:
37+
raise AssertionError("Expected a non-daemon thread but found none")
38+
print("ok")
39+
40+
stdout, _stderr, returncode = dangerously_run_function_in_subprocess(test_func)
41+
assert returncode == 0, _stderr
42+
assert stdout.strip() == "ok"
43+
44+
45+
def test_shutdown_default_executor_cleans_up_nondaemon_threads():
46+
"""Verify that calling shutdown_default_executor() + loop.close() eliminates all non-daemon
47+
threads created by asyncio.to_thread() - this is the fix applied in run_app.py."""
48+
49+
def test_func():
50+
import asyncio
51+
import threading
52+
53+
async def use_thread():
54+
await asyncio.to_thread(lambda: None)
55+
56+
loop = asyncio.new_event_loop()
57+
loop.run_until_complete(use_thread())
58+
59+
# Apply the fix
60+
loop.run_until_complete(loop.shutdown_default_executor())
61+
loop.close()
62+
63+
non_daemon = [t for t in threading.enumerate() if not t.daemon and t is not threading.main_thread()]
64+
if non_daemon:
65+
raise AssertionError(f"Expected no non-daemon threads but found: {[t.name for t in non_daemon]}")
66+
print("ok")
67+
68+
stdout, _stderr, returncode = dangerously_run_function_in_subprocess(test_func)
69+
assert returncode == 0, _stderr
70+
assert stdout.strip() == "ok"
71+
72+
73+
def test_shutdown_default_executor_works_after_simulated_keyboard_interrupt():
74+
"""Verify that the fix works even when run_until_complete() was previously interrupted,
75+
matching the exact flow in run_app.py's except KeyboardInterrupt block."""
76+
77+
def test_func():
78+
import asyncio
79+
import threading
80+
81+
async def use_thread_then_raise():
82+
await asyncio.to_thread(lambda: None)
83+
raise KeyboardInterrupt
84+
85+
loop = asyncio.new_event_loop()
86+
try:
87+
loop.run_until_complete(use_thread_then_raise())
88+
except KeyboardInterrupt:
89+
pass
90+
91+
# At this point a non-daemon thread exists (the bug)
92+
non_daemon_before = [t for t in threading.enumerate() if not t.daemon and t is not threading.main_thread()]
93+
if not non_daemon_before:
94+
raise AssertionError("Expected a non-daemon thread before fix")
95+
96+
# Apply the fix (what run_app.py now does)
97+
loop.run_until_complete(loop.shutdown_default_executor())
98+
loop.close()
99+
100+
non_daemon_after = [t for t in threading.enumerate() if not t.daemon and t is not threading.main_thread()]
101+
if non_daemon_after:
102+
raise AssertionError(f"Non-daemon threads remain after fix: {[t.name for t in non_daemon_after]}")
103+
print("ok")
104+
105+
stdout, _stderr, returncode = dangerously_run_function_in_subprocess(test_func)
106+
assert returncode == 0, _stderr
107+
assert stdout.strip() == "ok"
108+
109+
110+
def test_cancel_pending_tasks_suppresses_destroyed_task_warnings():
111+
"""Verify that cancelling pending tasks before loop.close() suppresses 'Task was destroyed
112+
but it is pending!' warnings (e.g. from socket.io ping tasks)."""
113+
114+
def test_func():
115+
import asyncio
116+
117+
async def long_running():
118+
await asyncio.sleep(1) # simulates a socket.io ping task
119+
120+
async def start_background_task():
121+
asyncio.create_task(long_running())
122+
await asyncio.to_thread(lambda: None)
123+
raise KeyboardInterrupt
124+
125+
loop = asyncio.new_event_loop()
126+
try:
127+
loop.run_until_complete(start_background_task())
128+
except KeyboardInterrupt:
129+
pass
130+
131+
# Apply the task-cancellation fix
132+
pending = [t for t in asyncio.all_tasks(loop) if not t.done()]
133+
for task in pending:
134+
task.cancel()
135+
if pending:
136+
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
137+
138+
loop.run_until_complete(loop.shutdown_default_executor())
139+
loop.close()
140+
print("ok")
141+
142+
stdout, _stderr, returncode = dangerously_run_function_in_subprocess(test_func)
143+
assert returncode == 0, _stderr
144+
assert stdout.strip() == "ok"
145+
# The "Task was destroyed but it is pending!" message appears on stderr when tasks are NOT
146+
# cancelled before loop.close(). After the fix it must be absent.
147+
assert "Task was destroyed but it is pending" not in _stderr

0 commit comments

Comments
 (0)