|
| 1 | +# SPDX-FileCopyrightText: GitHub, Inc. |
| 2 | +# SPDX-License-Identifier: MIT |
| 3 | + |
| 4 | +"""Stream-driving helpers for the runner. |
| 5 | +
|
| 6 | +This module owns the inner loop that consumes events from a backend |
| 7 | +adapter (`TextDelta` / `ToolEnd`), renders text deltas to the user, and |
| 8 | +bridges Copilot-side tool events into the run-hook callbacks that the |
| 9 | +runner uses to capture MCP results for ``repeat_prompt`` and session |
| 10 | +checkpointing. |
| 11 | +
|
| 12 | +Extracted from ``runner.py`` so the rate-limit/retry loop and the |
| 13 | +backend-event translation are independently readable and testable. |
| 14 | +""" |
| 15 | + |
| 16 | +from __future__ import annotations |
| 17 | + |
| 18 | +__all__ = ["STREAM_IDLE_TIMEOUT", "bridge_copilot_tool_event", "drive_backend_stream"] |
| 19 | + |
| 20 | +import asyncio |
| 21 | +import json |
| 22 | +import logging |
| 23 | +from types import SimpleNamespace |
| 24 | +from typing import Any |
| 25 | + |
| 26 | +from ._watchdog import watchdog_ping |
| 27 | +from .render_utils import render_model_output |
| 28 | +from .sdk import TextDelta, ToolEnd |
| 29 | +from .sdk.errors import BackendRateLimitError, BackendTimeoutError |
| 30 | + |
| 31 | +# Application-level backstop: if the backend's event stream goes silent |
| 32 | +# for this long, surface a BackendTimeoutError so the retry loop can |
| 33 | +# recover. This complements the TCP-level httpx timeouts in the |
| 34 | +# openai-agents adapter — those catch dead sockets, this catches the |
| 35 | +# subtler case where the connection stays open but nothing is flowing. |
| 36 | +STREAM_IDLE_TIMEOUT = 1800 |
| 37 | + |
| 38 | + |
| 39 | +async def bridge_copilot_tool_event(event: ToolEnd, run_hooks: Any) -> None: |
| 40 | + """Forward a Copilot ``ToolEnd`` into the openai-agents-style hooks. |
| 41 | +
|
| 42 | + The runner captures MCP tool output via ``run_hooks.on_tool_end``, |
| 43 | + which the openai-agents path drives natively. The Copilot adapter |
| 44 | + surfaces tool completions as ``ToolEnd`` events instead, so we |
| 45 | + invoke the same hooks here with: |
| 46 | +
|
| 47 | + * a ``SimpleNamespace(name=...)`` placeholder in lieu of the |
| 48 | + openai-agents ``Tool`` object — the hooks only read ``.name``. |
| 49 | + * a ``json.dumps({"text": ...})`` envelope around the result text, |
| 50 | + matching the wire format openai-agents uses when serialising MCP |
| 51 | + ``TextContent`` lists. ``_build_prompts_to_run`` in the runner |
| 52 | + depends on that exact envelope shape, so both backends produce |
| 53 | + identical entries in ``last_mcp_tool_results``. |
| 54 | + """ |
| 55 | + if run_hooks is None: |
| 56 | + return |
| 57 | + fake_tool = SimpleNamespace(name=event.tool_name) |
| 58 | + payload = json.dumps({"text": event.text}) |
| 59 | + await run_hooks.on_tool_start(None, None, fake_tool) |
| 60 | + await run_hooks.on_tool_end(None, None, fake_tool, payload) |
| 61 | + |
| 62 | + |
| 63 | +async def drive_backend_stream( |
| 64 | + *, |
| 65 | + backend_impl: Any, |
| 66 | + agent_handle: Any, |
| 67 | + prompt: str, |
| 68 | + max_turns: int, |
| 69 | + run_hooks: Any, |
| 70 | + async_task: bool, |
| 71 | + task_id: str, |
| 72 | + max_api_retry: int, |
| 73 | + initial_rate_limit_backoff: int, |
| 74 | + max_rate_limit_backoff: int, |
| 75 | +) -> None: |
| 76 | + """Run the backend's event stream to completion with retry/backoff. |
| 77 | +
|
| 78 | + Renders ``TextDelta`` events to stdout, forwards ``ToolEnd`` events |
| 79 | + to the run-hook bridge, retries up to *max_api_retry* times on |
| 80 | + :class:`BackendTimeoutError`, and applies exponential backoff up to |
| 81 | + *max_rate_limit_backoff* seconds on :class:`BackendRateLimitError` |
| 82 | + before giving up with a :class:`BackendTimeoutError`. |
| 83 | + """ |
| 84 | + max_retry = max_api_retry |
| 85 | + rate_limit_backoff = initial_rate_limit_backoff |
| 86 | + last_rate_limit_exc: BackendRateLimitError | None = None |
| 87 | + |
| 88 | + while rate_limit_backoff: |
| 89 | + try: |
| 90 | + stream = backend_impl.run_streamed( |
| 91 | + agent_handle, prompt, max_turns=max_turns |
| 92 | + ) |
| 93 | + stream_iter = stream.__aiter__() |
| 94 | + try: |
| 95 | + while True: |
| 96 | + try: |
| 97 | + event = await asyncio.wait_for( |
| 98 | + stream_iter.__anext__(), timeout=STREAM_IDLE_TIMEOUT |
| 99 | + ) |
| 100 | + except StopAsyncIteration: |
| 101 | + break |
| 102 | + except asyncio.TimeoutError as exc: |
| 103 | + raise BackendTimeoutError( |
| 104 | + f"Backend stream idle for {STREAM_IDLE_TIMEOUT}s" |
| 105 | + ) from exc |
| 106 | + watchdog_ping() |
| 107 | + if isinstance(event, TextDelta): |
| 108 | + await render_model_output( |
| 109 | + event.text, async_task=async_task, task_id=task_id |
| 110 | + ) |
| 111 | + elif isinstance(event, ToolEnd): |
| 112 | + await bridge_copilot_tool_event(event, run_hooks) |
| 113 | + finally: |
| 114 | + # Close the async generator so its finally block runs even |
| 115 | + # if we abort early (timeout / consumer break) — the |
| 116 | + # adapters use that to release backend-native resources. |
| 117 | + aclose = getattr(stream_iter, "aclose", None) |
| 118 | + if aclose is not None: |
| 119 | + try: |
| 120 | + await aclose() |
| 121 | + except Exception: # noqa: BLE001 - best-effort cleanup |
| 122 | + logging.exception("Failed to aclose backend stream iterator") |
| 123 | + await render_model_output("\n\n", async_task=async_task, task_id=task_id) |
| 124 | + return |
| 125 | + except BackendTimeoutError: |
| 126 | + if not max_retry: |
| 127 | + logging.exception("Max retries for BackendTimeoutError reached") |
| 128 | + raise |
| 129 | + max_retry -= 1 |
| 130 | + except BackendRateLimitError as exc: |
| 131 | + last_rate_limit_exc = exc |
| 132 | + if rate_limit_backoff == max_rate_limit_backoff: |
| 133 | + raise BackendTimeoutError("Max rate limit backoff reached") from exc |
| 134 | + if rate_limit_backoff > max_rate_limit_backoff: |
| 135 | + rate_limit_backoff = max_rate_limit_backoff |
| 136 | + else: |
| 137 | + rate_limit_backoff += rate_limit_backoff |
| 138 | + logging.exception(f"Hit rate limit ... holding for {rate_limit_backoff}") |
| 139 | + await asyncio.sleep(rate_limit_backoff) |
| 140 | + |
| 141 | + if last_rate_limit_exc is not None: # pragma: no cover - loop always returns/raises above |
| 142 | + raise BackendTimeoutError("Rate limit backoff exhausted") from last_rate_limit_exc |
0 commit comments