From 56a3e065e0492253b200c62ac1df4f947d099287 Mon Sep 17 00:00:00 2001 From: michaelreavant Date: Thu, 9 Apr 2026 14:28:54 +0200 Subject: [PATCH 1/3] fix(sdk): raise asyncio StreamReader buffer in Python AsyncHostTransport MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Python async transport spawned the host CLI without passing a `limit=` to `asyncio.create_subprocess_exec`, so its stdout `StreamReader` inherited asyncio's default 64 KiB buffer. Every host response is written as a single newline-delimited JSON line, so any `cli.invoke` whose serialized result exceeds 64 KiB (e.g. `superdoc_get_content` on larger documents) caused `readline()` to raise `ValueError: Separator is not found, and chunk exceed the limit` inside `_reader_loop`. The exception was caught by the generic reader-loop handler and pending requests were rejected with the misleading `HOST_DISCONNECTED` error — even though the host process was still alive and healthy. Pass `limit=` to `create_subprocess_exec` and expose it as a new `stdout_buffer_limit_bytes` constructor option on `AsyncHostTransport`, threaded through `SuperDocAsyncRuntime` and `AsyncSuperDocClient`. The default of 64 MiB safely covers the host's own 32 MiB `DEFAULT_MAX_STDIN_BYTES` input cap with room for ~2x JSON expansion. `SyncHostTransport` is unaffected — it uses raw blocking `subprocess.Popen` which has no asyncio buffer limit. Adds a `TestAsyncLargeResponse` regression suite that: 1. Round-trips a 200 KB response through the default-configured transport. 2. Pins that an explicitly tightened `stdout_buffer_limit_bytes` still reproduces the original failure mode, guaranteeing the option is wired through to `create_subprocess_exec`. --- packages/sdk/langs/python/superdoc/client.py | 2 + packages/sdk/langs/python/superdoc/runtime.py | 2 + .../sdk/langs/python/superdoc/transport.py | 5 +++ .../sdk/langs/python/tests/test_transport.py | 43 +++++++++++++++++++ 4 files changed, 52 insertions(+) diff --git a/packages/sdk/langs/python/superdoc/client.py b/packages/sdk/langs/python/superdoc/client.py index f187895b04..122e437a78 100644 --- a/packages/sdk/langs/python/superdoc/client.py +++ b/packages/sdk/langs/python/superdoc/client.py @@ -340,6 +340,7 @@ def __init__( request_timeout_ms: int | None = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, + stdout_buffer_limit_bytes: int = 64 * 1024 * 1024, default_change_mode: Literal['direct', 'tracked'] | None = None, user: UserIdentity | None = None, ) -> None: @@ -350,6 +351,7 @@ def __init__( request_timeout_ms=request_timeout_ms, watchdog_timeout_ms=watchdog_timeout_ms, max_queue_depth=max_queue_depth, + stdout_buffer_limit_bytes=stdout_buffer_limit_bytes, default_change_mode=default_change_mode, user=user, ) diff --git a/packages/sdk/langs/python/superdoc/runtime.py b/packages/sdk/langs/python/superdoc/runtime.py index e303754502..a725578e0d 100644 --- a/packages/sdk/langs/python/superdoc/runtime.py +++ b/packages/sdk/langs/python/superdoc/runtime.py @@ -79,6 +79,7 @@ def __init__( request_timeout_ms: Optional[int] = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, + stdout_buffer_limit_bytes: int = 64 * 1024 * 1024, default_change_mode: Optional[str] = None, user: Optional[Dict[str, str]] = None, ) -> None: @@ -93,6 +94,7 @@ def __init__( request_timeout_ms=request_timeout_ms, watchdog_timeout_ms=watchdog_timeout_ms, max_queue_depth=max_queue_depth, + stdout_buffer_limit_bytes=stdout_buffer_limit_bytes, default_change_mode=self._default_change_mode, user=user, ) diff --git a/packages/sdk/langs/python/superdoc/transport.py b/packages/sdk/langs/python/superdoc/transport.py index fec79e9c8e..93945c501e 100644 --- a/packages/sdk/langs/python/superdoc/transport.py +++ b/packages/sdk/langs/python/superdoc/transport.py @@ -399,6 +399,7 @@ def __init__( request_timeout_ms: Optional[int] = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, + stdout_buffer_limit_bytes: int = 64 * 1024 * 1024, default_change_mode: Optional[ChangeMode] = None, user: Optional[Dict[str, str]] = None, ) -> None: @@ -409,6 +410,7 @@ def __init__( self._request_timeout_ms = request_timeout_ms self._watchdog_timeout_ms = watchdog_timeout_ms self._max_queue_depth = max_queue_depth + self._stdout_buffer_limit_bytes = stdout_buffer_limit_bytes self._default_change_mode = default_change_mode self._user = user @@ -531,12 +533,15 @@ async def _start_host(self) -> None: args = [*prefix_args, 'host', '--stdio'] try: + # ``limit`` raises asyncio's StreamReader buffer above its 64 KiB + # default; host responses are single JSON lines and can exceed it. self._process = await asyncio.create_subprocess_exec( command, *args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, env={**os.environ, **self._env}, + limit=self._stdout_buffer_limit_bytes, ) logger.debug('Host spawned (pid=%s, bin=%s).', self._process.pid, self._cli_bin) except Exception as exc: diff --git a/packages/sdk/langs/python/tests/test_transport.py b/packages/sdk/langs/python/tests/test_transport.py index d71bafa186..bbda79ad5a 100644 --- a/packages/sdk/langs/python/tests/test_transport.py +++ b/packages/sdk/langs/python/tests/test_transport.py @@ -493,3 +493,46 @@ async def test_reuse_after_dispose(self): await transport.dispose() finally: _cleanup_wrapper(cli2) + + +class TestAsyncLargeResponse: + """Responses larger than the StreamReader buffer must not crash the reader.""" + + @pytest.mark.asyncio + async def test_response_above_default_64kb_buffer(self): + big_payload = 'x' * (200 * 1024) + cli = _mock_cli_bin({ + 'handshake': 'ok', + 'responses': [{'data': {'content': big_payload}}], + }) + try: + transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) + await transport.connect() + result = await transport.invoke(_TEST_OP, {'query': 'big'}) + assert result == {'content': big_payload} + assert transport.state == 'CONNECTED' + await transport.dispose() + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_response_above_custom_buffer_limit_disconnects(self): + # Pins that stdout_buffer_limit_bytes is wired through to the spawn: + # setting it below the response size reproduces the original failure. + big_payload = 'x' * (200 * 1024) + cli = _mock_cli_bin({ + 'handshake': 'ok', + 'responses': [{'data': {'content': big_payload}}], + }) + try: + transport = AsyncHostTransport( + cli, + startup_timeout_ms=5_000, + stdout_buffer_limit_bytes=64 * 1024, + ) + await transport.connect() + with pytest.raises(SuperDocError) as exc_info: + await transport.invoke(_TEST_OP, {'query': 'big'}) + assert exc_info.value.code == HOST_DISCONNECTED + finally: + _cleanup_wrapper(cli) From ba392e67a787088d558894ef70026aca1d74c0d3 Mon Sep 17 00:00:00 2001 From: michaelreavant Date: Tue, 14 Apr 2026 09:36:03 +0200 Subject: [PATCH 2/3] fix(sdk): tear down host process on async reader-loop failure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AsyncHostTransport._reader_loop caught reader exceptions by rejecting pending futures and flipping state to DISCONNECTED, but never killed self._process. Because dispose() early-returns on DISCONNECTED, any reader-loop failure left an orphaned host subprocess running with no public API to reap it. This is a pre-existing bug, but the previous commit made it easier to trip by exposing stdout_buffer_limit_bytes: any caller who sets it below their real response size hits the orphan path. Route both the buffer-overflow and generic-error branches through a new _schedule_cleanup helper that fires _cleanup() as a separate task (it can't be awaited inline — _cleanup cancels and awaits the reader task itself). _cleanup kills the process, waits on it, rejects pending, and only then transitions to DISCONNECTED, so a subsequent dispose() is a safe no-op instead of leaking the host. Also catch asyncio.LimitOverrunError / ValueError separately and surface HOST_PROTOCOL_ERROR with a "raise stdout_buffer_limit_bytes" hint plus the current limit in details. The previous HOST_DISCONNECTED code pointed users at the wrong problem since the host was still alive. Extends TestAsyncLargeResponse to assert HOST_PROTOCOL_ERROR, verify the hint is in the message, confirm the subprocess is actually reaped (returncode set, _process cleared), and that dispose() after an overflow is a safe no-op. --- .../sdk/langs/python/superdoc/transport.py | 39 ++++++++++++++++--- .../sdk/langs/python/tests/test_transport.py | 27 +++++++++++-- 2 files changed, 57 insertions(+), 9 deletions(-) diff --git a/packages/sdk/langs/python/superdoc/transport.py b/packages/sdk/langs/python/superdoc/transport.py index 93945c501e..d43ce04aac 100644 --- a/packages/sdk/langs/python/superdoc/transport.py +++ b/packages/sdk/langs/python/superdoc/transport.py @@ -416,6 +416,7 @@ def __init__( self._process: Optional[asyncio.subprocess.Process] = None self._reader_task: Optional[asyncio.Task] = None + self._cleanup_task: Optional[asyncio.Task] = None self._pending: Dict[int, asyncio.Future] = {} self._state = _State.DISCONNECTED self._next_request_id = 1 @@ -616,19 +617,35 @@ async def _reader_loop(self) -> None: except asyncio.CancelledError: return + except (asyncio.LimitOverrunError, ValueError) as exc: + # StreamReader raises LimitOverrunError (a ValueError subclass on + # some Python versions) when a single line exceeds the stdout + # buffer limit. The host is still alive — we must kill it so a + # later dispose() doesn't short-circuit on DISCONNECTED state. + logger.debug('Reader loop buffer overflow: %s', exc) + if not self._stopping: + self._schedule_cleanup(SuperDocError( + 'Host response exceeded stdout buffer limit. ' + 'Raise stdout_buffer_limit_bytes to accommodate larger responses.', + code=HOST_PROTOCOL_ERROR, + details={ + 'message': str(exc), + 'stdout_buffer_limit_bytes': self._stdout_buffer_limit_bytes, + }, + )) + return except Exception as exc: logger.debug('Reader loop error: %s', exc) - # Reader exited (EOF or error) — reject all pending futures. + # Reader exited (EOF or unexpected error) — tear down the process so + # no orphaned host is left running, then reject pending futures. if not self._stopping: exit_code = process.returncode - error = SuperDocError( + self._schedule_cleanup(SuperDocError( 'Host process disconnected.', code=HOST_DISCONNECTED, details={'exit_code': exit_code, 'signal': None}, - ) - self._reject_all_pending(error) - self._state = _State.DISCONNECTED + )) async def _send_request(self, method: str, params: Any, watchdog_ms: int) -> Any: """Send a JSON-RPC request and await the matching response future.""" @@ -697,6 +714,18 @@ async def _kill_and_reset(self) -> None: SuperDocError('Host process disconnected.', code=HOST_DISCONNECTED), ) + def _schedule_cleanup(self, error: SuperDocError) -> None: + """Fire-and-forget cleanup from inside the reader task. + + Must not be awaited from `_reader_loop` itself — `_cleanup` cancels + and awaits the reader task, which would deadlock. Scheduling it as a + separate task lets the reader return first; by the time cleanup runs + the reader task is already done and the cancel/await is a no-op. + """ + if self._cleanup_task and not self._cleanup_task.done(): + return + self._cleanup_task = asyncio.create_task(self._cleanup(error)) + async def _cleanup(self, error: Optional[SuperDocError]) -> None: """Cancel reader, kill process, reject pending, reset state.""" if self._reader_task and not self._reader_task.done(): diff --git a/packages/sdk/langs/python/tests/test_transport.py b/packages/sdk/langs/python/tests/test_transport.py index bbda79ad5a..f0de75f20e 100644 --- a/packages/sdk/langs/python/tests/test_transport.py +++ b/packages/sdk/langs/python/tests/test_transport.py @@ -516,9 +516,13 @@ async def test_response_above_default_64kb_buffer(self): _cleanup_wrapper(cli) @pytest.mark.asyncio - async def test_response_above_custom_buffer_limit_disconnects(self): - # Pins that stdout_buffer_limit_bytes is wired through to the spawn: - # setting it below the response size reproduces the original failure. + async def test_response_above_custom_buffer_limit_raises_protocol_error(self): + # Setting stdout_buffer_limit_bytes below the response size should + # surface HOST_PROTOCOL_ERROR (actionable) rather than + # HOST_DISCONNECTED (misleading — the host is still alive), and the + # error should carry a hint to raise the buffer limit. + from superdoc.errors import HOST_PROTOCOL_ERROR + big_payload = 'x' * (200 * 1024) cli = _mock_cli_bin({ 'handshake': 'ok', @@ -531,8 +535,23 @@ async def test_response_above_custom_buffer_limit_disconnects(self): stdout_buffer_limit_bytes=64 * 1024, ) await transport.connect() + process = transport._process + assert process is not None with pytest.raises(SuperDocError) as exc_info: await transport.invoke(_TEST_OP, {'query': 'big'}) - assert exc_info.value.code == HOST_DISCONNECTED + assert exc_info.value.code == HOST_PROTOCOL_ERROR + assert 'stdout_buffer_limit_bytes' in str(exc_info.value) + + # The host process must be torn down — not just the transport + # state flipped to DISCONNECTED. Otherwise dispose() short-circuits + # and leaves an orphaned host running. + if transport._cleanup_task is not None: + await transport._cleanup_task + assert transport._process is None + assert transport.state == 'DISCONNECTED' + assert process.returncode is not None + + # dispose() after an overflow must be a safe no-op. + await transport.dispose() finally: _cleanup_wrapper(cli) From 4ffd1e7bd30e78f2068951f6f2b31ec776970cba Mon Sep 17 00:00:00 2001 From: michaelreavant Date: Tue, 14 Apr 2026 09:45:07 +0200 Subject: [PATCH 3/3] refactor(sdk): dedupe stdout_buffer_limit default and add wiring test Address review follow-ups on the async transport buffer-limit option. - Hoist DEFAULT_STDOUT_BUFFER_LIMIT_BYTES (64 MiB) to module scope in transport.py and reference it from AsyncHostTransport, the async runtime, and AsyncSuperDocClient so the default lives in one place instead of three copies of 64 * 1024 * 1024. - Add a short "raise if a single host response can exceed this size" comment on the client.py parameter so callers see the guidance at the public API boundary, not buried in transport.py. - Rename test_response_above_default_64kb_buffer to test_response_above_asyncio_default_streamreader_limit. 64 KiB is asyncio's default, not the SDK's (which is now 64 MiB), so the old name read backwards after this PR. - Add test_client_threads_stdout_buffer_limit_to_transport: builds AsyncSuperDocClient with a custom limit and asserts the value reaches AsyncHostTransport. Without this, a silent drop of the arg in client.py or runtime.py would leave the existing overflow test passing while the public API reverts to the asyncio 64 KiB default. --- packages/sdk/langs/python/superdoc/client.py | 5 ++++- packages/sdk/langs/python/superdoc/runtime.py | 8 +++++-- .../sdk/langs/python/superdoc/transport.py | 8 ++++++- .../sdk/langs/python/tests/test_transport.py | 22 ++++++++++++++++++- 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/packages/sdk/langs/python/superdoc/client.py b/packages/sdk/langs/python/superdoc/client.py index 122e437a78..9291399518 100644 --- a/packages/sdk/langs/python/superdoc/client.py +++ b/packages/sdk/langs/python/superdoc/client.py @@ -22,6 +22,7 @@ DocOpenResult as GeneratedDocOpenResult, ) from .runtime import SuperDocAsyncRuntime, SuperDocSyncRuntime +from .transport import DEFAULT_STDOUT_BUFFER_LIMIT_BYTES UserIdentity = Dict[str, str] @@ -340,7 +341,9 @@ def __init__( request_timeout_ms: int | None = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, - stdout_buffer_limit_bytes: int = 64 * 1024 * 1024, + # Raise if a single host response can exceed this size (e.g. reading + # very large documents); otherwise the default is safe. + stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, default_change_mode: Literal['direct', 'tracked'] | None = None, user: UserIdentity | None = None, ) -> None: diff --git a/packages/sdk/langs/python/superdoc/runtime.py b/packages/sdk/langs/python/superdoc/runtime.py index a725578e0d..25dc3727e0 100644 --- a/packages/sdk/langs/python/superdoc/runtime.py +++ b/packages/sdk/langs/python/superdoc/runtime.py @@ -14,7 +14,11 @@ from .embedded_cli import resolve_embedded_cli_path from .generated.contract import OPERATION_INDEX from .protocol import normalize_default_change_mode -from .transport import AsyncHostTransport, SyncHostTransport +from .transport import ( + DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, + AsyncHostTransport, + SyncHostTransport, +) class SuperDocSyncRuntime: @@ -79,7 +83,7 @@ def __init__( request_timeout_ms: Optional[int] = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, - stdout_buffer_limit_bytes: int = 64 * 1024 * 1024, + stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, default_change_mode: Optional[str] = None, user: Optional[Dict[str, str]] = None, ) -> None: diff --git a/packages/sdk/langs/python/superdoc/transport.py b/packages/sdk/langs/python/superdoc/transport.py index d43ce04aac..bacd490b5d 100644 --- a/packages/sdk/langs/python/superdoc/transport.py +++ b/packages/sdk/langs/python/superdoc/transport.py @@ -43,6 +43,12 @@ logger = logging.getLogger('superdoc.transport') +# Default stdout StreamReader buffer for the async transport. Host responses +# are single newline-delimited JSON lines, so this caps the largest individual +# response a caller can receive. Raise it if your workload routinely produces +# responses above this size (e.g. whole-document reads on very large docs). +DEFAULT_STDOUT_BUFFER_LIMIT_BYTES = 64 * 1024 * 1024 + # Opt-in debug logging via SUPERDOC_DEBUG=1 or SUPERDOC_LOG_LEVEL=debug. # Only configures the named logger — never mutates root logging config. _log_level = os.environ.get('SUPERDOC_LOG_LEVEL', '').lower() @@ -399,7 +405,7 @@ def __init__( request_timeout_ms: Optional[int] = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, - stdout_buffer_limit_bytes: int = 64 * 1024 * 1024, + stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, default_change_mode: Optional[ChangeMode] = None, user: Optional[Dict[str, str]] = None, ) -> None: diff --git a/packages/sdk/langs/python/tests/test_transport.py b/packages/sdk/langs/python/tests/test_transport.py index f0de75f20e..aba75db875 100644 --- a/packages/sdk/langs/python/tests/test_transport.py +++ b/packages/sdk/langs/python/tests/test_transport.py @@ -499,7 +499,7 @@ class TestAsyncLargeResponse: """Responses larger than the StreamReader buffer must not crash the reader.""" @pytest.mark.asyncio - async def test_response_above_default_64kb_buffer(self): + async def test_response_above_asyncio_default_streamreader_limit(self): big_payload = 'x' * (200 * 1024) cli = _mock_cli_bin({ 'handshake': 'ok', @@ -555,3 +555,23 @@ async def test_response_above_custom_buffer_limit_raises_protocol_error(self): await transport.dispose() finally: _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_client_threads_stdout_buffer_limit_to_transport(self): + # End-to-end wiring check: the public AsyncSuperDocClient constructor + # must thread stdout_buffer_limit_bytes through SuperDocAsyncRuntime + # into AsyncHostTransport. Without this, a silent drop in client.py + # or runtime.py would leave the existing overflow test passing while + # the public API reverts to the asyncio 64 KiB default. + from superdoc.client import AsyncSuperDocClient + + cli = _mock_cli_bin({'handshake': 'ok'}) + try: + client = AsyncSuperDocClient( + env={'SUPERDOC_CLI_BIN': cli}, + stdout_buffer_limit_bytes=64 * 1024, + ) + transport = client._runtime._transport + assert transport._stdout_buffer_limit_bytes == 64 * 1024 + finally: + _cleanup_wrapper(cli)