@@ -480,8 +480,6 @@ def __init__(self):
480480 super ().__init__ ()
481481 self ._send_loop = asyncio .new_event_loop ()
482482 self ._callback_loop = asyncio .new_event_loop ()
483- self ._callback_queue = asyncio .Queue ()
484-
485483 self ._callback_thread = threading .Thread (target = self ._start_callback_thread , daemon = True )
486484 self ._callback_thread .start ()
487485
@@ -516,7 +514,13 @@ def _send_data(self, metadata, data, key=None):
516514 event = threading .Event ()
517515 self ._requests [request_id ] = event , key
518516
519- asyncio .run_coroutine_threadsafe (self ._send_async (data ), self ._send_loop )
517+ try :
518+ asyncio .run_coroutine_threadsafe (self ._send_async (data ), self ._send_loop )
519+ except RuntimeError :
520+ # Event loop is closed — connection is dead, clean up and bail.
521+ if event :
522+ self ._requests .pop (request_id , None )
523+ return None
520524 if event :
521525 event .wait ()
522526 return self ._requests .pop (request_id )
@@ -530,15 +534,19 @@ async def handle_callbacks():
530534 try :
531535 func , args = await self ._callback_queue .get ()
532536 func (* args )
537+ except asyncio .CancelledError :
538+ break
539+ except RuntimeError :
540+ break
533541 except asyncio .QueueEmpty :
534542 pass
535543 except Exception as e :
536544 print ("error in callback" , type (e ), str (e ))
537- # await asyncio.sleep(0.01)
538545
539546 try :
540547 self ._callback_loop = asyncio .new_event_loop ()
541548 asyncio .set_event_loop (self ._callback_loop )
549+ self ._callback_queue = asyncio .Queue ()
542550 self ._callback_loop .create_task (handle_callbacks ())
543551 self ._callback_loop .run_forever ()
544552 except Exception as e :
0 commit comments