@@ -384,14 +384,26 @@ async def polling_loop(self) -> None:
384384 self .queue_empty_flag .clear ()
385385 continue
386386
387- # Check if we have capacity before querying
387+ # Always reap stale AQS rows, even when the pool is full.
388+ # Previously this was gated behind `if self.semaphore.locked()`
389+ # below — which deadlocks the system when all workers are
390+ # wedged on a hung LLM call: cleanup never runs, AQS rows go
391+ # unbounded, no new claims possible. Cleanup is cheap; do it
392+ # unconditionally on every poll tick.
393+ try :
394+ await self .cleanup_stale_work_units ()
395+ except Exception as cleanup_exc :
396+ logger .exception (
397+ "Stale work unit cleanup failed: %s" , cleanup_exc
398+ )
399+
400+ # Check if we have capacity before querying for new work
388401 if self .semaphore .locked ():
389402 # logger.debug("All workers busy, waiting")
390403 await asyncio .sleep (settings .DERIVER .POLLING_SLEEP_INTERVAL_SECONDS )
391404 continue
392405
393406 try :
394- await self .cleanup_stale_work_units ()
395407 await self .refresh_queue_health_metrics ()
396408 claimed_work_units = await self .get_and_claim_work_units ()
397409 if claimed_work_units :
@@ -843,12 +855,21 @@ async def process_work_unit(self, work_unit_key: str, worker_id: str) -> None:
843855 for item in items_to_process
844856 if item .message_id is not None
845857 ]
846- await process_representation_batch (
847- messages_context ,
848- message_level_configuration ,
849- observers = observers ,
850- observed = work_unit .observed ,
851- queue_item_message_ids = queue_item_message_ids ,
858+ # Bounded by DERIVER.WORK_UNIT_TIMEOUT_SECONDS so a hung
859+ # LLM call (CF Gateway streaming a Gemini response that
860+ # never terminates) raises TimeoutError instead of holding
861+ # the semaphore forever. Without this the worker pool
862+ # deadlocks: every slot held by a hung call, no new claims
863+ # possible. See cleanup_stale_work_units gating fix above.
864+ await asyncio .wait_for (
865+ process_representation_batch (
866+ messages_context ,
867+ message_level_configuration ,
868+ observers = observers ,
869+ observed = work_unit .observed ,
870+ queue_item_message_ids = queue_item_message_ids ,
871+ ),
872+ timeout = settings .DERIVER .WORK_UNIT_TIMEOUT_SECONDS ,
852873 )
853874 await self .mark_queue_items_as_processed (
854875 items_to_process , work_unit_key
@@ -873,7 +894,14 @@ async def process_work_unit(self, work_unit_key: str, worker_id: str) -> None:
873894 break
874895
875896 try :
876- await process_item (queue_item )
897+ # Same WORK_UNIT_TIMEOUT_SECONDS bound as the
898+ # representation path — covers summary/dream/webhook
899+ # task types so a hung specialist call cannot hold
900+ # the semaphore indefinitely.
901+ await asyncio .wait_for (
902+ process_item (queue_item ),
903+ timeout = settings .DERIVER .WORK_UNIT_TIMEOUT_SECONDS ,
904+ )
877905 await self .mark_queue_items_as_processed (
878906 [queue_item ], work_unit_key
879907 )
0 commit comments