Skip to content

Commit ac611c5

Browse files
fix(deriver): break worker-pool deadlock on hung LLM calls
Two compounding bugs caused the deriver worker pool to wedge after a single CF-Gateway-streamed Gemini response failed to terminate: 1. process_work_unit holds `async with self.semaphore` across the inner LLM call (process_representation_batch / process_item). With no asyncio-level timeout, a hung HTTP read held the slot forever. Eight workers x one hung call each = pool fully locked. 2. polling_loop gated cleanup_stale_work_units behind `if self.semaphore.locked(): continue`, so once the pool was full the stale-AQS cleanup never ran. STALE_SESSION_TIMEOUT_MINUTES became dead-lettered. Pod restarts didn't help: new pods reclaimed the same poisoned work_unit_keys and re-wedged within minutes. Fixes: - Add DERIVER_WORK_UNIT_TIMEOUT_SECONDS (default 600s) and wrap both process_representation_batch and process_item in asyncio.wait_for. TimeoutError propagates to _handle_processing_error, the `async with` unwinds, the semaphore slot releases. - Move cleanup_stale_work_units above the semaphore-locked check so AQS rows always get reaped on every poll tick, even with a full pool. Cleanup is cheap; running it unconditionally costs one index scan per poll. Symptoms before fix: active_queue_sessions rows aging past STALE_SESSION_TIMEOUT_MINUTES, queue.processed=false count climbing into thousands across all task types, deriver pod alive (PID 1 ok) but log output silent for hours.
1 parent 7fec670 commit ac611c5

2 files changed

Lines changed: 48 additions & 9 deletions

File tree

src/config.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,17 @@ class DeriverSettings(HonchoSettings):
713713
] = 1.0
714714
STALE_SESSION_TIMEOUT_MINUTES: Annotated[int, Field(default=5, gt=0, le=1440)] = 5
715715

716+
# Hard upper bound on a single work-unit pass through process_work_unit.
717+
# If the inner LLM call hangs (e.g. CF Gateway streaming a Gemini response
718+
# that never terminates), asyncio.wait_for raises TimeoutError, the
719+
# `async with self.semaphore` block exits, and the slot is released so
720+
# other workers can claim other work units. Without this bound the worker
721+
# pool deadlocks: all N slots held by hung tasks, semaphore permanently
722+
# locked, polling loop never reaches cleanup_stale_work_units.
723+
WORK_UNIT_TIMEOUT_SECONDS: Annotated[
724+
int, Field(default=600, gt=0, le=7200)
725+
] = 600
726+
716727
# Retention window (seconds) for keeping errored items in the queue
717728
QUEUE_ERROR_RETENTION_SECONDS: Annotated[
718729
int, Field(default=30 * 24 * 3600, gt=0)

src/deriver/queue_manager.py

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -384,14 +384,26 @@ async def polling_loop(self) -> None:
384384
self.queue_empty_flag.clear()
385385
continue
386386

387-
# Check if we have capacity before querying
387+
# Always reap stale AQS rows, even when the pool is full.
388+
# Previously this was gated behind `if self.semaphore.locked()`
389+
# below — which deadlocks the system when all workers are
390+
# wedged on a hung LLM call: cleanup never runs, AQS rows go
391+
# unbounded, no new claims possible. Cleanup is cheap; do it
392+
# unconditionally on every poll tick.
393+
try:
394+
await self.cleanup_stale_work_units()
395+
except Exception as cleanup_exc:
396+
logger.exception(
397+
"Stale work unit cleanup failed: %s", cleanup_exc
398+
)
399+
400+
# Check if we have capacity before querying for new work
388401
if self.semaphore.locked():
389402
# logger.debug("All workers busy, waiting")
390403
await asyncio.sleep(settings.DERIVER.POLLING_SLEEP_INTERVAL_SECONDS)
391404
continue
392405

393406
try:
394-
await self.cleanup_stale_work_units()
395407
await self.refresh_queue_health_metrics()
396408
claimed_work_units = await self.get_and_claim_work_units()
397409
if claimed_work_units:
@@ -843,12 +855,21 @@ async def process_work_unit(self, work_unit_key: str, worker_id: str) -> None:
843855
for item in items_to_process
844856
if item.message_id is not None
845857
]
846-
await process_representation_batch(
847-
messages_context,
848-
message_level_configuration,
849-
observers=observers,
850-
observed=work_unit.observed,
851-
queue_item_message_ids=queue_item_message_ids,
858+
# Bounded by DERIVER.WORK_UNIT_TIMEOUT_SECONDS so a hung
859+
# LLM call (CF Gateway streaming a Gemini response that
860+
# never terminates) raises TimeoutError instead of holding
861+
# the semaphore forever. Without this the worker pool
862+
# deadlocks: every slot held by a hung call, no new claims
863+
# possible. See cleanup_stale_work_units gating fix above.
864+
await asyncio.wait_for(
865+
process_representation_batch(
866+
messages_context,
867+
message_level_configuration,
868+
observers=observers,
869+
observed=work_unit.observed,
870+
queue_item_message_ids=queue_item_message_ids,
871+
),
872+
timeout=settings.DERIVER.WORK_UNIT_TIMEOUT_SECONDS,
852873
)
853874
await self.mark_queue_items_as_processed(
854875
items_to_process, work_unit_key
@@ -873,7 +894,14 @@ async def process_work_unit(self, work_unit_key: str, worker_id: str) -> None:
873894
break
874895

875896
try:
876-
await process_item(queue_item)
897+
# Same WORK_UNIT_TIMEOUT_SECONDS bound as the
898+
# representation path — covers summary/dream/webhook
899+
# task types so a hung specialist call cannot hold
900+
# the semaphore indefinitely.
901+
await asyncio.wait_for(
902+
process_item(queue_item),
903+
timeout=settings.DERIVER.WORK_UNIT_TIMEOUT_SECONDS,
904+
)
877905
await self.mark_queue_items_as_processed(
878906
[queue_item], work_unit_key
879907
)

0 commit comments

Comments
 (0)