Skip to content

Commit 7b5d3e6

Browse files
committed
Add asyncio_event_loop (currently only set for asyncio backend) to the kernel and drop old integration code.
1 parent 104ef76 commit 7b5d3e6

File tree

3 files changed

+17
-71
lines changed

3 files changed

+17
-71
lines changed

ipykernel/eventloops.py

Lines changed: 4 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import os
77
import platform
88
import sys
9-
from functools import partial
109

1110
import zmq
1211
from packaging.version import Version as V
@@ -400,63 +399,11 @@ def loop_cocoa_exit(kernel):
400399

401400
@register_integration("asyncio")
402401
def loop_asyncio(kernel):
403-
"""Start a kernel with asyncio event loop support."""
404-
import asyncio
402+
"""Verify the asyncio event loop is supported."""
405403

406-
loop = asyncio.get_event_loop()
407-
# loop is already running (e.g. tornado 5), nothing left to do
408-
if loop.is_running():
409-
return
410-
411-
if loop.is_closed():
412-
# main loop is closed, create a new one
413-
loop = asyncio.new_event_loop()
414-
asyncio.set_event_loop(loop)
415-
loop._should_close = False # type:ignore[attr-defined]
416-
417-
# pause eventloop when there's an event on a zmq socket
418-
def process_stream_events(socket):
419-
"""fall back to main loop when there's a socket event"""
420-
loop.stop()
421-
422-
notifier = partial(process_stream_events, kernel.shell_socket)
423-
loop.add_reader(kernel.shell_socket.getsockopt(zmq.FD), notifier)
424-
loop.call_soon(notifier)
425-
426-
while True:
427-
error = None
428-
try:
429-
loop.run_forever()
430-
except KeyboardInterrupt:
431-
continue
432-
except Exception as e:
433-
error = e
434-
if loop._should_close: # type:ignore[attr-defined]
435-
loop.close()
436-
if error is not None:
437-
raise error
438-
break
439-
440-
441-
@loop_asyncio.exit
442-
def loop_asyncio_exit(kernel):
443-
"""Exit hook for asyncio"""
444-
import asyncio
445-
446-
loop = asyncio.get_event_loop()
447-
448-
async def close_loop():
449-
if hasattr(loop, "shutdown_asyncgens"):
450-
yield loop.shutdown_asyncgens()
451-
loop._should_close = True # type:ignore[attr-defined]
452-
loop.stop()
453-
454-
if loop.is_running():
455-
close_loop()
456-
457-
elif not loop.is_closed():
458-
loop.run_until_complete(close_loop) # type:ignore[arg-type]
459-
loop.close()
404+
if not kernel.asyncio_event_loop or not kernel.asyncio_event_loop.is_running():
405+
msg = "The asyncio event loop is not running so is not supported."
406+
raise RuntimeError(msg)
460407

461408

462409
def set_qt_api_env_from_gui(gui):

ipykernel/kernelbase.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
# Distributed under the terms of the Modified BSD License.
55
from __future__ import annotations
66

7+
import asyncio
8+
import contextlib
79
import inspect
810
import itertools
911
import logging
@@ -128,6 +130,7 @@ class Kernel(SingletonConfigurable):
128130

129131
_send_exec_request: Dict[dict[zmq_anyio.Socket, MemoryObjectSendStream]] = Dict()
130132
_main_subshell_ready = Instance(Event, ())
133+
asyncio_event_loop = Instance(asyncio.AbstractEventLoop, allow_none=True, read_only=True) # type:ignore[call-overload]
131134

132135
log: logging.Logger = Instance(logging.Logger, allow_none=True) # type:ignore[assignment]
133136

@@ -441,10 +444,13 @@ async def shell_main(self, subshell_id: str | None):
441444
tg.start_soon(self._execute_request_handler, receive_stream)
442445
if subshell_id is None:
443446
# Main subshell.
447+
with contextlib.suppress(RuntimeError):
448+
self.set_trait("asyncio_event_loop", asyncio.get_running_loop())
444449
self._main_subshell_ready.set()
445450
await to_thread.run_sync(self.shell_stop.wait)
446451
tg.cancel_scope.cancel()
447452
self._send_exec_request.pop(socket, None)
453+
self.set_trait("asyncio_event_loop", None)
448454
await send_stream.aclose()
449455

450456
async def _execute_request_handler(self, receive_stream: MemoryObjectReceiveStream):
@@ -821,7 +827,7 @@ async def execute_request(self, socket, ident, parent):
821827
self._aborted_time = time.monotonic()
822828
self.log.info("Aborting queue")
823829

824-
def do_execute(
830+
async def do_execute(
825831
self,
826832
code,
827833
silent,

tests/test_eventloop.py

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
"""Test eventloop integration"""
22

3-
import asyncio
43
import os
54
import sys
65
import threading
@@ -80,18 +79,12 @@ def do_thing():
8079
t.join(1)
8180

8281

83-
@windows_skip
84-
@pytest.mark.parametrize("anyio_backend", ["asyncio"])
85-
async def test_asyncio_loop(kernel):
86-
okay = asyncio.Event()
87-
88-
def do_thing():
89-
okay.set()
90-
91-
loop = asyncio.get_event_loop()
92-
loop.call_soon(do_thing)
93-
loop_asyncio(kernel)
94-
await asyncio.wait_for(okay.wait(), 1)
82+
async def test_asyncio_loop(kernel, anyio_backend):
83+
if anyio_backend == "asyncio":
84+
loop_asyncio(kernel)
85+
else:
86+
with pytest.raises(RuntimeError):
87+
loop_asyncio(kernel)
9588

9689

9790
@windows_skip

0 commit comments

Comments
 (0)