@@ -506,6 +506,7 @@ def __init__(self):
506506 self ._callback_loop = asyncio .new_event_loop ()
507507 self ._callback_queue = asyncio .Queue ()
508508 self ._callback_task = None
509+ self ._flush_scheduled = False
509510 self ._callback_thread = threading .Thread (target = self ._start_callback_thread , daemon = True )
510511 self ._callback_thread .start ()
511512
@@ -542,33 +543,61 @@ def _send_data(self, metadata, data, key=None):
542543
543544 coro = self ._send_async (data )
544545 try :
545- asyncio .run_coroutine_threadsafe (coro , self ._send_loop )
546+ fut = asyncio .run_coroutine_threadsafe (coro , self ._send_loop )
546547 except RuntimeError :
547548 coro .close ()
548549 # Event loop is closed — connection is dead, clean up and bail.
549550 if event :
550551 self ._requests .pop (request_id , None )
551552 return None
553+
552554 if event :
555+ def _on_send_done (f , request_id = request_id ):
556+ try :
557+ exc = f .exception ()
558+ except Exception : # cancelled or loop-closed — let the wait time out
559+ return
560+ if exc is None :
561+ return
562+ pending = self ._requests .get (request_id )
563+ if isinstance (pending , tuple ):
564+ self ._requests [request_id ] = exc
565+ pending [0 ].set ()
566+
567+ fut .add_done_callback (_on_send_done )
568+
553569 if not event .wait (timeout = 120 ):
554570 self ._requests .pop (request_id , None )
555571 raise TimeoutError (f"Request { request_id } timed out after 120s" )
556- return self ._requests .pop (request_id )
572+ result = self ._requests .pop (request_id )
573+ if isinstance (result , BaseException ):
574+ raise result
575+ return result
557576
558577 async def _send_async (self , message ):
559578 raise NotImplementedError
560579
561580 def _schedule_flush (self ):
562581 """Schedule a fire-and-forget flush of the release queue via the send loop.
563- Uses a 50ms debounce to let concurrent threads finish their operations."""
582+ Uses a 50ms debounce to let concurrent threads finish their operations.
583+
584+ At most one debounced flush is in flight at a time: further proxy
585+ __del__s in the same window are absorbed by the already-queued flush
586+ (the release queue is drained wholesale), so we never schedule a burst
587+ of overlapping send coroutines on the send loop."""
588+ if self ._flush_scheduled :
589+ return
590+ self ._flush_scheduled = True
564591 coro = self ._debounced_flush ()
565592 try :
566593 asyncio .run_coroutine_threadsafe (coro , self ._send_loop )
567594 except RuntimeError :
595+ self ._flush_scheduled = False
568596 coro .close ()
569597
570598 async def _debounced_flush (self ):
571599 await asyncio .sleep (0.05 ) # 50ms coalescing window
600+ self ._flush_scheduled = False
572601 await self ._async_flush ()
573602
574603 async def _async_flush (self ):
0 commit comments