3333)
3434
3535from robotcode .core .async_tools import (
36- async_event ,
3736 create_sub_task ,
3837 run_coroutine_in_thread ,
3938)
4039from robotcode .core .concurrent import FutureEx , is_threaded_callable , run_in_thread
40+ from robotcode .core .event import event
4141from robotcode .core .utils .dataclasses import as_json , from_dict
4242from robotcode .core .utils .inspect import ensure_coroutine , iter_methods
4343from robotcode .core .utils .logging import LoggingDescriptor
@@ -374,12 +374,12 @@ def __init__(self) -> None:
374374 def loop (self ) -> Optional [asyncio .AbstractEventLoop ]:
375375 return self ._loop
376376
377- @async_event
378- async def on_connection_made (sender , transport : asyncio .BaseTransport ) -> None :
377+ @event
378+ def on_connection_made (sender , transport : asyncio .BaseTransport ) -> None :
379379 ...
380380
381- @async_event
382- async def on_connection_lost (sender , exc : Optional [BaseException ]) -> None :
381+ @event
382+ def on_connection_lost (sender , exc : Optional [BaseException ]) -> None :
383383 ...
384384
385385 def connection_made (self , transport : asyncio .BaseTransport ) -> None :
@@ -390,10 +390,10 @@ def connection_made(self, transport: asyncio.BaseTransport) -> None:
390390 if isinstance (transport , asyncio .WriteTransport ):
391391 self .write_transport = transport
392392
393- create_sub_task ( self .on_connection_made (self , transport ) )
393+ self .on_connection_made (self , transport )
394394
395395 def connection_lost (self , exc : Optional [BaseException ]) -> None :
396- create_sub_task ( self .on_connection_lost (self , exc ) )
396+ self .on_connection_lost (self , exc )
397397 self ._loop = None
398398
399399 def eof_received (self ) -> Optional [bool ]:
@@ -451,6 +451,7 @@ def __init__(self) -> None:
451451 self ._received_request : OrderedDict [Union [str , int , None ], ReceivedRequestEntry ] = OrderedDict ()
452452 self ._received_request_lock = threading .RLock ()
453453 self ._signature_cache : Dict [Callable [..., Any ], inspect .Signature ] = {}
454+ self ._running_handle_message_tasks : Set [asyncio .Future [Any ]] = set ()
454455
455456 @staticmethod
456457 def _generate_json_rpc_messages_from_dict (
@@ -494,15 +495,10 @@ def _handle_body(self, body: bytes, charset: str) -> None:
494495 self .send_error (JsonRPCErrors .PARSE_ERROR , f"{ type (e ).__name__ } : { e } " )
495496
496497 def _handle_messages (self , iterator : Iterator [JsonRPCMessage ]) -> None :
497- def done (f : asyncio .Future [Any ]) -> None :
498- if f .done () and not f .cancelled ():
499- ex = f .exception ()
500-
501- if ex is None or isinstance (ex , asyncio .CancelledError ):
502- return
503-
504498 for m in iterator :
505- create_sub_task (self .handle_message (m )).add_done_callback (done )
499+ task = asyncio .create_task (self .handle_message (m ))
500+ self ._running_handle_message_tasks .add (task )
501+ task .add_done_callback (self ._running_handle_message_tasks .discard )
506502
507503 @__logger .call
508504 async def handle_message (self , message : JsonRPCMessage ) -> None :
0 commit comments