5050 Instance ,
5151 Integer ,
5252 List ,
53- Set ,
5453 Unicode ,
5554 default ,
5655 observe ,
@@ -240,9 +239,6 @@ def _parent_header(self):
240239 # by record_ports and used by connect_request.
241240 _recorded_ports = Dict ()
242241
243- # set of aborted msg_ids
244- aborted = Set ()
245-
246242 # Track execution count here. For IPython, we override this to use the
247243 # execution count we store in the shell.
248244 execution_count = 0
@@ -258,14 +254,10 @@ def _parent_header(self):
258254 "shutdown_request" ,
259255 "is_complete_request" ,
260256 "interrupt_request" ,
261- # deprecated:
262- "apply_request" ,
263257 ]
264258 # add deprecated ipyparallel control messages
265259 control_msg_types = [
266260 * msg_types ,
267- "clear_request" ,
268- "abort_request" ,
269261 "debug_request" ,
270262 "usage_request" ,
271263 "create_subshell_request" ,
@@ -343,13 +335,11 @@ def should_handle(self, stream, msg, idents):
343335 """Check whether a shell-channel message should be handled
344336
345337 Allows subclasses to prevent handling of certain messages (e.g. aborted requests).
338+
339+ .. versionchanged:: 7
340+ Subclass should_handle _may_ be async.
341+ Base class implementation is not async.
346342 """
347- msg_id = msg ["header" ]["msg_id" ]
348- if msg_id in self .aborted :
349- # is it safe to assume a msg_id will not be resubmitted?
350- self .aborted .remove (msg_id )
351- self ._send_abort_reply (stream , msg , idents )
352- return False
353343 return True
354344
355345 async def dispatch_shell (self , msg , / , subshell_id : str | None = None ):
@@ -412,8 +402,12 @@ async def dispatch_shell(self, msg, /, subshell_id: str | None = None):
412402 self .log .debug ("\n *** MESSAGE TYPE:%s***" , msg_type )
413403 self .log .debug (" Content: %s\n --->\n " , msg ["content" ])
414404
415- if not self .should_handle (stream , msg , idents ):
405+ should_handle : bool | t .Awaitable [bool ] = self .should_handle (stream , msg , idents )
406+ if inspect .isawaitable (should_handle ):
407+ should_handle = await should_handle
408+ if not should_handle :
416409 self ._publish_status_and_flush ("idle" , "shell" , stream )
410+ self .log .debug ("Not handling %s:%s" , msg_type , msg ["header" ].get ("msg_id" ))
417411 return
418412
419413 handler = self .shell_handlers .get (msg_type , None )
@@ -1228,88 +1222,6 @@ async def list_subshell_request(self, socket, ident, parent) -> None:
12281222
12291223 self .session .send (socket , "list_subshell_reply" , reply , parent , ident )
12301224
1231- # ---------------------------------------------------------------------------
1232- # Engine methods (DEPRECATED)
1233- # ---------------------------------------------------------------------------
1234-
1235- async def apply_request (self , stream , ident , parent ): # pragma: no cover
1236- """Handle an apply request."""
1237- self .log .warning ("apply_request is deprecated in kernel_base, moving to ipyparallel." )
1238- try :
1239- content = parent ["content" ]
1240- bufs = parent ["buffers" ]
1241- msg_id = parent ["header" ]["msg_id" ]
1242- except Exception :
1243- self .log .error ("Got bad msg: %s" , parent , exc_info = True ) # noqa: G201
1244- return
1245-
1246- md = self .init_metadata (parent )
1247-
1248- reply_content , result_buf = self .do_apply (content , bufs , msg_id , md )
1249-
1250- # flush i/o
1251- if sys .stdout is not None :
1252- sys .stdout .flush ()
1253- if sys .stderr is not None :
1254- sys .stderr .flush ()
1255-
1256- md = self .finish_metadata (parent , md , reply_content )
1257- if not self .session :
1258- return
1259- self .session .send (
1260- stream ,
1261- "apply_reply" ,
1262- reply_content ,
1263- parent = parent ,
1264- ident = ident ,
1265- buffers = result_buf ,
1266- metadata = md ,
1267- )
1268-
1269- def do_apply (self , content , bufs , msg_id , reply_metadata ):
1270- """DEPRECATED"""
1271- raise NotImplementedError
1272-
1273- # ---------------------------------------------------------------------------
1274- # Control messages (DEPRECATED)
1275- # ---------------------------------------------------------------------------
1276-
1277- async def abort_request (self , stream , ident , parent ): # pragma: no cover
1278- """abort a specific msg by id"""
1279- self .log .warning (
1280- "abort_request is deprecated in kernel_base. It is only part of IPython parallel"
1281- )
1282- msg_ids = parent ["content" ].get ("msg_ids" , None )
1283- if isinstance (msg_ids , str ):
1284- msg_ids = [msg_ids ]
1285- if not msg_ids :
1286- subshell_id = parent ["header" ].get ("subshell_id" )
1287- self ._abort_queues (subshell_id )
1288-
1289- for mid in msg_ids :
1290- self .aborted .add (str (mid ))
1291-
1292- content = dict (status = "ok" )
1293- if not self .session :
1294- return
1295- reply_msg = self .session .send (
1296- stream , "abort_reply" , content = content , parent = parent , ident = ident
1297- )
1298- self .log .debug ("%s" , reply_msg )
1299-
1300- async def clear_request (self , stream , idents , parent ): # pragma: no cover
1301- """Clear our namespace."""
1302- self .log .warning (
1303- "clear_request is deprecated in kernel_base. It is only part of IPython parallel"
1304- )
1305- content = self .do_clear ()
1306- if self .session :
1307- self .session .send (stream , "clear_reply" , ident = idents , parent = parent , content = content )
1308-
1309- def do_clear (self ):
1310- """DEPRECATED since 4.0.3"""
1311- raise NotImplementedError
1312-
13131225 # ---------------------------------------------------------------------------
13141226 # Protected interface
13151227 # ---------------------------------------------------------------------------
0 commit comments