diff --git a/packages/sdk/langs/python/superdoc/client.py b/packages/sdk/langs/python/superdoc/client.py index f187895b04..9291399518 100644 --- a/packages/sdk/langs/python/superdoc/client.py +++ b/packages/sdk/langs/python/superdoc/client.py @@ -22,6 +22,7 @@ DocOpenResult as GeneratedDocOpenResult, ) from .runtime import SuperDocAsyncRuntime, SuperDocSyncRuntime +from .transport import DEFAULT_STDOUT_BUFFER_LIMIT_BYTES UserIdentity = Dict[str, str] @@ -340,6 +341,9 @@ def __init__( request_timeout_ms: int | None = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, + # Raise if a single host response can exceed this size (e.g. reading + # very large documents); otherwise the default is safe. + stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, default_change_mode: Literal['direct', 'tracked'] | None = None, user: UserIdentity | None = None, ) -> None: @@ -350,6 +354,7 @@ def __init__( request_timeout_ms=request_timeout_ms, watchdog_timeout_ms=watchdog_timeout_ms, max_queue_depth=max_queue_depth, + stdout_buffer_limit_bytes=stdout_buffer_limit_bytes, default_change_mode=default_change_mode, user=user, ) diff --git a/packages/sdk/langs/python/superdoc/runtime.py b/packages/sdk/langs/python/superdoc/runtime.py index e303754502..25dc3727e0 100644 --- a/packages/sdk/langs/python/superdoc/runtime.py +++ b/packages/sdk/langs/python/superdoc/runtime.py @@ -14,7 +14,11 @@ from .embedded_cli import resolve_embedded_cli_path from .generated.contract import OPERATION_INDEX from .protocol import normalize_default_change_mode -from .transport import AsyncHostTransport, SyncHostTransport +from .transport import ( + DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, + AsyncHostTransport, + SyncHostTransport, +) class SuperDocSyncRuntime: @@ -79,6 +83,7 @@ def __init__( request_timeout_ms: Optional[int] = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, + stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, default_change_mode: Optional[str] = None, user: Optional[Dict[str, str]] = None, ) -> None: @@ -93,6 +98,7 @@ def __init__( request_timeout_ms=request_timeout_ms, watchdog_timeout_ms=watchdog_timeout_ms, max_queue_depth=max_queue_depth, + stdout_buffer_limit_bytes=stdout_buffer_limit_bytes, default_change_mode=self._default_change_mode, user=user, ) diff --git a/packages/sdk/langs/python/superdoc/transport.py b/packages/sdk/langs/python/superdoc/transport.py index fec79e9c8e..bacd490b5d 100644 --- a/packages/sdk/langs/python/superdoc/transport.py +++ b/packages/sdk/langs/python/superdoc/transport.py @@ -43,6 +43,12 @@ logger = logging.getLogger('superdoc.transport') +# Default stdout StreamReader buffer for the async transport. Host responses +# are single newline-delimited JSON lines, so this caps the largest individual +# response a caller can receive. Raise it if your workload routinely produces +# responses above this size (e.g. whole-document reads on very large docs). +DEFAULT_STDOUT_BUFFER_LIMIT_BYTES = 64 * 1024 * 1024 + # Opt-in debug logging via SUPERDOC_DEBUG=1 or SUPERDOC_LOG_LEVEL=debug. # Only configures the named logger — never mutates root logging config. _log_level = os.environ.get('SUPERDOC_LOG_LEVEL', '').lower() @@ -399,6 +405,7 @@ def __init__( request_timeout_ms: Optional[int] = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, + stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, default_change_mode: Optional[ChangeMode] = None, user: Optional[Dict[str, str]] = None, ) -> None: @@ -409,11 +416,13 @@ def __init__( self._request_timeout_ms = request_timeout_ms self._watchdog_timeout_ms = watchdog_timeout_ms self._max_queue_depth = max_queue_depth + self._stdout_buffer_limit_bytes = stdout_buffer_limit_bytes self._default_change_mode = default_change_mode self._user = user self._process: Optional[asyncio.subprocess.Process] = None self._reader_task: Optional[asyncio.Task] = None + self._cleanup_task: Optional[asyncio.Task] = None self._pending: Dict[int, asyncio.Future] = {} self._state = _State.DISCONNECTED self._next_request_id = 1 @@ -531,12 +540,15 @@ async def _start_host(self) -> None: args = [*prefix_args, 'host', '--stdio'] try: + # ``limit`` raises asyncio's StreamReader buffer above its 64 KiB + # default; host responses are single JSON lines and can exceed it. self._process = await asyncio.create_subprocess_exec( command, *args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, env={**os.environ, **self._env}, + limit=self._stdout_buffer_limit_bytes, ) logger.debug('Host spawned (pid=%s, bin=%s).', self._process.pid, self._cli_bin) except Exception as exc: @@ -611,19 +623,35 @@ async def _reader_loop(self) -> None: except asyncio.CancelledError: return + except (asyncio.LimitOverrunError, ValueError) as exc: + # StreamReader raises LimitOverrunError (a ValueError subclass on + # some Python versions) when a single line exceeds the stdout + # buffer limit. The host is still alive — we must kill it so a + # later dispose() doesn't short-circuit on DISCONNECTED state. + logger.debug('Reader loop buffer overflow: %s', exc) + if not self._stopping: + self._schedule_cleanup(SuperDocError( + 'Host response exceeded stdout buffer limit. ' + 'Raise stdout_buffer_limit_bytes to accommodate larger responses.', + code=HOST_PROTOCOL_ERROR, + details={ + 'message': str(exc), + 'stdout_buffer_limit_bytes': self._stdout_buffer_limit_bytes, + }, + )) + return except Exception as exc: logger.debug('Reader loop error: %s', exc) - # Reader exited (EOF or error) — reject all pending futures. + # Reader exited (EOF or unexpected error) — tear down the process so + # no orphaned host is left running, then reject pending futures. if not self._stopping: exit_code = process.returncode - error = SuperDocError( + self._schedule_cleanup(SuperDocError( 'Host process disconnected.', code=HOST_DISCONNECTED, details={'exit_code': exit_code, 'signal': None}, - ) - self._reject_all_pending(error) - self._state = _State.DISCONNECTED + )) async def _send_request(self, method: str, params: Any, watchdog_ms: int) -> Any: """Send a JSON-RPC request and await the matching response future.""" @@ -692,6 +720,18 @@ async def _kill_and_reset(self) -> None: SuperDocError('Host process disconnected.', code=HOST_DISCONNECTED), ) + def _schedule_cleanup(self, error: SuperDocError) -> None: + """Fire-and-forget cleanup from inside the reader task. + + Must not be awaited from `_reader_loop` itself — `_cleanup` cancels + and awaits the reader task, which would deadlock. Scheduling it as a + separate task lets the reader return first; by the time cleanup runs + the reader task is already done and the cancel/await is a no-op. + """ + if self._cleanup_task and not self._cleanup_task.done(): + return + self._cleanup_task = asyncio.create_task(self._cleanup(error)) + async def _cleanup(self, error: Optional[SuperDocError]) -> None: """Cancel reader, kill process, reject pending, reset state.""" if self._reader_task and not self._reader_task.done(): diff --git a/packages/sdk/langs/python/tests/test_transport.py b/packages/sdk/langs/python/tests/test_transport.py index d71bafa186..aba75db875 100644 --- a/packages/sdk/langs/python/tests/test_transport.py +++ b/packages/sdk/langs/python/tests/test_transport.py @@ -493,3 +493,85 @@ async def test_reuse_after_dispose(self): await transport.dispose() finally: _cleanup_wrapper(cli2) + + +class TestAsyncLargeResponse: + """Responses larger than the StreamReader buffer must not crash the reader.""" + + @pytest.mark.asyncio + async def test_response_above_asyncio_default_streamreader_limit(self): + big_payload = 'x' * (200 * 1024) + cli = _mock_cli_bin({ + 'handshake': 'ok', + 'responses': [{'data': {'content': big_payload}}], + }) + try: + transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) + await transport.connect() + result = await transport.invoke(_TEST_OP, {'query': 'big'}) + assert result == {'content': big_payload} + assert transport.state == 'CONNECTED' + await transport.dispose() + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_response_above_custom_buffer_limit_raises_protocol_error(self): + # Setting stdout_buffer_limit_bytes below the response size should + # surface HOST_PROTOCOL_ERROR (actionable) rather than + # HOST_DISCONNECTED (misleading — the host is still alive), and the + # error should carry a hint to raise the buffer limit. + from superdoc.errors import HOST_PROTOCOL_ERROR + + big_payload = 'x' * (200 * 1024) + cli = _mock_cli_bin({ + 'handshake': 'ok', + 'responses': [{'data': {'content': big_payload}}], + }) + try: + transport = AsyncHostTransport( + cli, + startup_timeout_ms=5_000, + stdout_buffer_limit_bytes=64 * 1024, + ) + await transport.connect() + process = transport._process + assert process is not None + with pytest.raises(SuperDocError) as exc_info: + await transport.invoke(_TEST_OP, {'query': 'big'}) + assert exc_info.value.code == HOST_PROTOCOL_ERROR + assert 'stdout_buffer_limit_bytes' in str(exc_info.value) + + # The host process must be torn down — not just the transport + # state flipped to DISCONNECTED. Otherwise dispose() short-circuits + # and leaves an orphaned host running. + if transport._cleanup_task is not None: + await transport._cleanup_task + assert transport._process is None + assert transport.state == 'DISCONNECTED' + assert process.returncode is not None + + # dispose() after an overflow must be a safe no-op. + await transport.dispose() + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_client_threads_stdout_buffer_limit_to_transport(self): + # End-to-end wiring check: the public AsyncSuperDocClient constructor + # must thread stdout_buffer_limit_bytes through SuperDocAsyncRuntime + # into AsyncHostTransport. Without this, a silent drop in client.py + # or runtime.py would leave the existing overflow test passing while + # the public API reverts to the asyncio 64 KiB default. + from superdoc.client import AsyncSuperDocClient + + cli = _mock_cli_bin({'handshake': 'ok'}) + try: + client = AsyncSuperDocClient( + env={'SUPERDOC_CLI_BIN': cli}, + stdout_buffer_limit_bytes=64 * 1024, + ) + transport = client._runtime._transport + assert transport._stdout_buffer_limit_bytes == 64 * 1024 + finally: + _cleanup_wrapper(cli)