|
24 | 24 | import logging |
25 | 25 | import os |
26 | 26 | import sys |
| 27 | +import threading |
| 28 | +import time |
27 | 29 | import uuid |
28 | 30 | from typing import Any |
29 | 31 |
|
|
58 | 60 | # the server (or async generator) stops producing events. |
59 | 61 | STREAM_IDLE_TIMEOUT = 1800 |
60 | 62 |
|
| 63 | +# Watchdog: a non-asyncio thread that force-kills the process if the event |
| 64 | +# loop stops making progress. Covers every hang variant (dead connections, |
| 65 | +# asyncio cleanup spin, MCP cleanup, etc.) because it runs outside asyncio. |
| 66 | +WATCHDOG_IDLE_TIMEOUT = int(os.environ.get("WATCHDOG_IDLE_TIMEOUT", "1800")) # 30 min default |
| 67 | + |
| 68 | +_watchdog_last_activity = time.monotonic() |
| 69 | +_watchdog_lock = threading.Lock() |
| 70 | + |
| 71 | + |
| 72 | +def watchdog_ping() -> None: |
| 73 | + """Call from any coroutine/callback to signal the process is alive.""" |
| 74 | + global _watchdog_last_activity |
| 75 | + with _watchdog_lock: |
| 76 | + _watchdog_last_activity = time.monotonic() |
| 77 | + |
| 78 | + |
| 79 | +def _watchdog_thread(timeout: int) -> None: |
| 80 | + """Background thread: force-exit if no activity for *timeout* seconds.""" |
| 81 | + check_interval = min(60, max(1, timeout // 5)) |
| 82 | + while True: |
| 83 | + time.sleep(check_interval) |
| 84 | + with _watchdog_lock: |
| 85 | + idle = time.monotonic() - _watchdog_last_activity |
| 86 | + if idle > timeout: |
| 87 | + logging.error( |
| 88 | + f"Watchdog: no activity for {idle:.0f}s (limit {timeout}s) — " |
| 89 | + "force-exiting to prevent hang" |
| 90 | + ) |
| 91 | + sys.stderr.flush() |
| 92 | + sys.stdout.flush() |
| 93 | + os._exit(2) |
| 94 | + |
| 95 | + |
| 96 | +def start_watchdog(timeout: int = WATCHDOG_IDLE_TIMEOUT) -> None: |
| 97 | + """Start the watchdog thread (idempotent, daemon thread).""" |
| 98 | + t = threading.Thread(target=_watchdog_thread, args=(timeout,), daemon=True) |
| 99 | + t.start() |
| 100 | + |
61 | 101 |
|
62 | 102 | def _resolve_model_config( |
63 | 103 | available_tools: AvailableTools, |
@@ -417,6 +457,7 @@ async def _run_streamed() -> None: |
417 | 457 | ) |
418 | 458 | raise APITimeoutError("Stream idle timeout exceeded") |
419 | 459 | if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): |
| 460 | + watchdog_ping() |
420 | 461 | await render_model_output(event.data.delta, async_task=async_task, task_id=task_id) |
421 | 462 | finally: |
422 | 463 | if stream is not None: |
@@ -464,6 +505,7 @@ async def _run_streamed() -> None: |
464 | 505 | finally: |
465 | 506 | # Close the AsyncOpenAI client to release httpx connection pool. |
466 | 507 | # Dead CLOSE_WAIT sockets in the pool cause kqueue CPU spin if left open. |
| 508 | + watchdog_ping() |
467 | 509 | if agent0 is not None: |
468 | 510 | await agent0.close() |
469 | 511 | start_cleanup.set() |
@@ -506,12 +548,18 @@ async def run_main( |
506 | 548 | """ |
507 | 549 | from .session import TaskflowSession |
508 | 550 |
|
| 551 | + # Start the watchdog thread — if the process hangs for any reason |
| 552 | + # (asyncio spin, dead connections, MCP cleanup), this kills it. |
| 553 | + start_watchdog() |
| 554 | + |
509 | 555 | last_mcp_tool_results: list[str] = [] |
510 | 556 |
|
511 | 557 | async def on_tool_end_hook(context: RunContextWrapper[TContext], agent: Agent[TContext], tool: Tool, result: str) -> None: |
| 558 | + watchdog_ping() |
512 | 559 | last_mcp_tool_results.append(result) |
513 | 560 |
|
514 | 561 | async def on_tool_start_hook(context: RunContextWrapper[TContext], agent: Agent[TContext], tool: Tool) -> None: |
| 562 | + watchdog_ping() |
515 | 563 | await render_model_output(f"\n** 🤖🛠️ Tool Call: {tool.name}\n") |
516 | 564 |
|
517 | 565 | async def on_handoff_hook(context: RunContextWrapper[TContext], agent: Agent[TContext], source: Agent[TContext]) -> None: |
|
0 commit comments