Skip to content

Commit 1757bd6

Browse files
committed
Subshells implemented using tornado event loops
1 parent 7603443 commit 1757bd6

File tree

13 files changed

+961
-49
lines changed

13 files changed

+961
-49
lines changed

ipykernel/control.py

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,11 @@
11
"""A thread for a control channel."""
2-
from threading import Thread
32

4-
from tornado.ioloop import IOLoop
3+
from .thread import CONTROL_THREAD_NAME, BaseThread
54

6-
CONTROL_THREAD_NAME = "Control"
75

8-
9-
class ControlThread(Thread):
6+
class ControlThread(BaseThread):
107
"""A thread for a control channel."""
118

129
def __init__(self, **kwargs):
1310
"""Initialize the thread."""
14-
Thread.__init__(self, name=CONTROL_THREAD_NAME, **kwargs)
15-
self.io_loop = IOLoop(make_current=False)
16-
self.pydev_do_not_trace = True
17-
self.is_pydev_daemon_thread = True
18-
19-
def run(self):
20-
"""Run the thread."""
21-
self.name = CONTROL_THREAD_NAME
22-
try:
23-
self.io_loop.start()
24-
finally:
25-
self.io_loop.close()
26-
27-
def stop(self):
28-
"""Stop the thread.
29-
30-
This method is threadsafe.
31-
"""
32-
self.io_loop.add_callback(self.io_loop.stop)
11+
super().__init__(name=CONTROL_THREAD_NAME, **kwargs)

ipykernel/ipkernel.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,11 @@ def set_sigint_result():
361361
# restore the previous sigint handler
362362
signal.signal(signal.SIGINT, save_sigint)
363363

364+
@contextmanager
365+
def _dummy_context_manager(self, *args):
366+
# Signals only work in main thread, so cannot use _cancel_on_sigint in subshells.
367+
yield
368+
364369
async def execute_request(self, stream, ident, parent):
365370
"""Override for cell output - cell reconciliation."""
366371
parent_header = extract_header(parent)
@@ -439,7 +444,12 @@ async def run_cell(*args, **kwargs):
439444

440445
coro_future = asyncio.ensure_future(coro)
441446

442-
with self._cancel_on_sigint(coro_future):
447+
cm = (
448+
self._cancel_on_sigint
449+
if threading.current_thread() == threading.main_thread()
450+
else self._dummy_context_manager
451+
)
452+
with cm(coro_future): # type:ignore[operator]
443453
res = None
444454
try:
445455
res = await coro_future

ipykernel/kernelapp.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
from .iostream import IOPubThread
5454
from .ipkernel import IPythonKernel
5555
from .parentpoller import ParentPollerUnix, ParentPollerWindows
56+
from .shellchannel import ShellChannelThread
5657
from .zmqshell import ZMQInteractiveShell
5758

5859
# -----------------------------------------------------------------------------
@@ -143,6 +144,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp, ConnectionFileMix
143144
iopub_socket = Any()
144145
iopub_thread = Any()
145146
control_thread = Any()
147+
shell_channel_thread = Any()
146148

147149
_ports = Dict()
148150

@@ -367,6 +369,11 @@ def init_control(self, context):
367369
self.control_socket.router_handover = 1
368370

369371
self.control_thread = ControlThread(daemon=True)
372+
self.shell_channel_thread = ShellChannelThread(
373+
context,
374+
self.shell_socket,
375+
daemon=True,
376+
)
370377

371378
def init_iopub(self, context):
372379
"""Initialize the iopub channel."""
@@ -406,6 +413,10 @@ def close(self):
406413
self.log.debug("Closing control thread")
407414
self.control_thread.stop()
408415
self.control_thread.join()
416+
if self.shell_channel_thread and self.shell_channel_thread.is_alive():
417+
self.log.debug("Closing shell channel thread")
418+
self.shell_channel_thread.stop()
419+
self.shell_channel_thread.join()
409420

410421
if self.debugpy_socket and not self.debugpy_socket.closed:
411422
self.debugpy_socket.close()
@@ -546,10 +557,17 @@ def init_signal(self):
546557

547558
def init_kernel(self):
548559
"""Create the Kernel object itself"""
549-
shell_stream = ZMQStream(self.shell_socket)
560+
if self.shell_channel_thread:
561+
shell_stream = ZMQStream(self.shell_socket, self.shell_channel_thread.io_loop)
562+
else:
563+
shell_stream = ZMQStream(self.shell_socket)
550564
control_stream = ZMQStream(self.control_socket, self.control_thread.io_loop)
551565
debugpy_stream = ZMQStream(self.debugpy_socket, self.control_thread.io_loop)
566+
552567
self.control_thread.start()
568+
if self.shell_channel_thread:
569+
self.shell_channel_thread.start()
570+
553571
kernel_factory = self.kernel_class.instance # type:ignore[attr-defined]
554572

555573
kernel = kernel_factory(
@@ -560,6 +578,7 @@ def init_kernel(self):
560578
debug_shell_socket=self.debug_shell_socket,
561579
shell_stream=shell_stream,
562580
control_thread=self.control_thread,
581+
shell_channel_thread=self.shell_channel_thread,
563582
iopub_thread=self.iopub_thread,
564583
iopub_socket=self.iopub_socket,
565584
stdin_socket=self.stdin_socket,

0 commit comments

Comments
 (0)