Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/sdk/langs/python/superdoc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
DocOpenResult as GeneratedDocOpenResult,
)
from .runtime import SuperDocAsyncRuntime, SuperDocSyncRuntime
from .transport import DEFAULT_STDOUT_BUFFER_LIMIT_BYTES

UserIdentity = Dict[str, str]

Expand Down Expand Up @@ -340,6 +341,9 @@ def __init__(
request_timeout_ms: int | None = None,
watchdog_timeout_ms: int = 30_000,
max_queue_depth: int = 100,
# Raise if a single host response can exceed this size (e.g. reading
# very large documents); otherwise the default is safe.
stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES,
default_change_mode: Literal['direct', 'tracked'] | None = None,
user: UserIdentity | None = None,
) -> None:
Expand All @@ -350,6 +354,7 @@ def __init__(
request_timeout_ms=request_timeout_ms,
watchdog_timeout_ms=watchdog_timeout_ms,
max_queue_depth=max_queue_depth,
stdout_buffer_limit_bytes=stdout_buffer_limit_bytes,
default_change_mode=default_change_mode,
user=user,
)
Expand Down
8 changes: 7 additions & 1 deletion packages/sdk/langs/python/superdoc/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
from .embedded_cli import resolve_embedded_cli_path
from .generated.contract import OPERATION_INDEX
from .protocol import normalize_default_change_mode
from .transport import AsyncHostTransport, SyncHostTransport
from .transport import (
DEFAULT_STDOUT_BUFFER_LIMIT_BYTES,
AsyncHostTransport,
SyncHostTransport,
)


class SuperDocSyncRuntime:
Expand Down Expand Up @@ -79,6 +83,7 @@ def __init__(
request_timeout_ms: Optional[int] = None,
watchdog_timeout_ms: int = 30_000,
max_queue_depth: int = 100,
stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES,
default_change_mode: Optional[str] = None,
user: Optional[Dict[str, str]] = None,
) -> None:
Expand All @@ -93,6 +98,7 @@ def __init__(
request_timeout_ms=request_timeout_ms,
watchdog_timeout_ms=watchdog_timeout_ms,
max_queue_depth=max_queue_depth,
stdout_buffer_limit_bytes=stdout_buffer_limit_bytes,
default_change_mode=self._default_change_mode,
user=user,
)
Expand Down
50 changes: 45 additions & 5 deletions packages/sdk/langs/python/superdoc/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@

logger = logging.getLogger('superdoc.transport')

# Default stdout StreamReader buffer for the async transport. Host responses
# are single newline-delimited JSON lines, so this caps the largest individual
# response a caller can receive. Raise it if your workload routinely produces
# responses above this size (e.g. whole-document reads on very large docs).
DEFAULT_STDOUT_BUFFER_LIMIT_BYTES = 64 * 1024 * 1024

# Opt-in debug logging via SUPERDOC_DEBUG=1 or SUPERDOC_LOG_LEVEL=debug.
# Only configures the named logger — never mutates root logging config.
_log_level = os.environ.get('SUPERDOC_LOG_LEVEL', '').lower()
Expand Down Expand Up @@ -399,6 +405,7 @@ def __init__(
request_timeout_ms: Optional[int] = None,
watchdog_timeout_ms: int = 30_000,
max_queue_depth: int = 100,
stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES,
default_change_mode: Optional[ChangeMode] = None,
user: Optional[Dict[str, str]] = None,
) -> None:
Expand All @@ -409,11 +416,13 @@ def __init__(
self._request_timeout_ms = request_timeout_ms
self._watchdog_timeout_ms = watchdog_timeout_ms
self._max_queue_depth = max_queue_depth
self._stdout_buffer_limit_bytes = stdout_buffer_limit_bytes
self._default_change_mode = default_change_mode
self._user = user

self._process: Optional[asyncio.subprocess.Process] = None
self._reader_task: Optional[asyncio.Task] = None
self._cleanup_task: Optional[asyncio.Task] = None
self._pending: Dict[int, asyncio.Future] = {}
self._state = _State.DISCONNECTED
self._next_request_id = 1
Expand Down Expand Up @@ -531,12 +540,15 @@ async def _start_host(self) -> None:
args = [*prefix_args, 'host', '--stdio']

try:
# ``limit`` raises asyncio's StreamReader buffer above its 64 KiB
# default; host responses are single JSON lines and can exceed it.
self._process = await asyncio.create_subprocess_exec(
command, *args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
env={**os.environ, **self._env},
limit=self._stdout_buffer_limit_bytes,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this exposes the bug down at transport.py:619-631 — see the summary. the fix belongs in that branch, not here.

)
logger.debug('Host spawned (pid=%s, bin=%s).', self._process.pid, self._cli_bin)
except Exception as exc:
Expand Down Expand Up @@ -611,19 +623,35 @@ async def _reader_loop(self) -> None:

except asyncio.CancelledError:
return
except (asyncio.LimitOverrunError, ValueError) as exc:
# StreamReader raises LimitOverrunError (a ValueError subclass on
# some Python versions) when a single line exceeds the stdout
# buffer limit. The host is still alive — we must kill it so a
# later dispose() doesn't short-circuit on DISCONNECTED state.
logger.debug('Reader loop buffer overflow: %s', exc)
if not self._stopping:
self._schedule_cleanup(SuperDocError(
'Host response exceeded stdout buffer limit. '
'Raise stdout_buffer_limit_bytes to accommodate larger responses.',
code=HOST_PROTOCOL_ERROR,
details={
'message': str(exc),
'stdout_buffer_limit_bytes': self._stdout_buffer_limit_bytes,
},
))
return
except Exception as exc:
logger.debug('Reader loop error: %s', exc)

# Reader exited (EOF or error) — reject all pending futures.
# Reader exited (EOF or unexpected error) — tear down the process so
# no orphaned host is left running, then reject pending futures.
if not self._stopping:
exit_code = process.returncode
error = SuperDocError(
self._schedule_cleanup(SuperDocError(
'Host process disconnected.',
code=HOST_DISCONNECTED,
details={'exit_code': exit_code, 'signal': None},
)
self._reject_all_pending(error)
self._state = _State.DISCONNECTED
))

async def _send_request(self, method: str, params: Any, watchdog_ms: int) -> Any:
"""Send a JSON-RPC request and await the matching response future."""
Expand Down Expand Up @@ -692,6 +720,18 @@ async def _kill_and_reset(self) -> None:
SuperDocError('Host process disconnected.', code=HOST_DISCONNECTED),
)

def _schedule_cleanup(self, error: SuperDocError) -> None:
"""Fire-and-forget cleanup from inside the reader task.

Must not be awaited from `_reader_loop` itself — `_cleanup` cancels
and awaits the reader task, which would deadlock. Scheduling it as a
separate task lets the reader return first; by the time cleanup runs
the reader task is already done and the cancel/await is a no-op.
"""
if self._cleanup_task and not self._cleanup_task.done():
return
self._cleanup_task = asyncio.create_task(self._cleanup(error))

async def _cleanup(self, error: Optional[SuperDocError]) -> None:
"""Cancel reader, kill process, reject pending, reset state."""
if self._reader_task and not self._reader_task.done():
Expand Down
82 changes: 82 additions & 0 deletions packages/sdk/langs/python/tests/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,3 +493,85 @@ async def test_reuse_after_dispose(self):
await transport.dispose()
finally:
_cleanup_wrapper(cli2)


class TestAsyncLargeResponse:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both tests build AsyncHostTransport directly, so if someone drops the arg in client.py or runtime.py the public API breaks silently while these still pass. worth one small test that builds AsyncSuperDocClient(env={'SUPERDOC_CLI_BIN': cli}, stdout_buffer_limit_bytes=64*1024) and checks the value reaches the transport.

"""Responses larger than the StreamReader buffer must not crash the reader."""

@pytest.mark.asyncio
async def test_response_above_asyncio_default_streamreader_limit(self):
big_payload = 'x' * (200 * 1024)
cli = _mock_cli_bin({
'handshake': 'ok',
'responses': [{'data': {'content': big_payload}}],
})
try:
transport = AsyncHostTransport(cli, startup_timeout_ms=5_000)
await transport.connect()
result = await transport.invoke(_TEST_OP, {'query': 'big'})
assert result == {'content': big_payload}
assert transport.state == 'CONNECTED'
await transport.dispose()
finally:
_cleanup_wrapper(cli)

@pytest.mark.asyncio
async def test_response_above_custom_buffer_limit_raises_protocol_error(self):
# Setting stdout_buffer_limit_bytes below the response size should
# surface HOST_PROTOCOL_ERROR (actionable) rather than
# HOST_DISCONNECTED (misleading — the host is still alive), and the
# error should carry a hint to raise the buffer limit.
from superdoc.errors import HOST_PROTOCOL_ERROR

big_payload = 'x' * (200 * 1024)
cli = _mock_cli_bin({
'handshake': 'ok',
'responses': [{'data': {'content': big_payload}}],
})
try:
transport = AsyncHostTransport(
cli,
startup_timeout_ms=5_000,
stdout_buffer_limit_bytes=64 * 1024,
)
await transport.connect()
process = transport._process
assert process is not None
with pytest.raises(SuperDocError) as exc_info:
await transport.invoke(_TEST_OP, {'query': 'big'})
assert exc_info.value.code == HOST_PROTOCOL_ERROR
assert 'stdout_buffer_limit_bytes' in str(exc_info.value)

# The host process must be torn down — not just the transport
# state flipped to DISCONNECTED. Otherwise dispose() short-circuits
# and leaves an orphaned host running.
if transport._cleanup_task is not None:
await transport._cleanup_task
assert transport._process is None
assert transport.state == 'DISCONNECTED'
assert process.returncode is not None

# dispose() after an overflow must be a safe no-op.
await transport.dispose()
finally:
_cleanup_wrapper(cli)

@pytest.mark.asyncio
async def test_client_threads_stdout_buffer_limit_to_transport(self):
# End-to-end wiring check: the public AsyncSuperDocClient constructor
# must thread stdout_buffer_limit_bytes through SuperDocAsyncRuntime
# into AsyncHostTransport. Without this, a silent drop in client.py
# or runtime.py would leave the existing overflow test passing while
# the public API reverts to the asyncio 64 KiB default.
from superdoc.client import AsyncSuperDocClient

cli = _mock_cli_bin({'handshake': 'ok'})
try:
client = AsyncSuperDocClient(
env={'SUPERDOC_CLI_BIN': cli},
stdout_buffer_limit_bytes=64 * 1024,
)
transport = client._runtime._transport
assert transport._stdout_buffer_limit_bytes == 64 * 1024
finally:
_cleanup_wrapper(cli)
Comment on lines +531 to +557
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no await transport.dispose() on this path, so the host is left running after the test. wrap in try/finally like the sibling tests:

Suggested change
try:
transport = AsyncHostTransport(
cli,
startup_timeout_ms=5_000,
stdout_buffer_limit_bytes=64 * 1024,
)
await transport.connect()
with pytest.raises(SuperDocError) as exc_info:
await transport.invoke(_TEST_OP, {'query': 'big'})
assert exc_info.value.code == HOST_DISCONNECTED
finally:
_cleanup_wrapper(cli)
try:
transport = AsyncHostTransport(
cli,
startup_timeout_ms=5_000,
stdout_buffer_limit_bytes=64 * 1024,
)
await transport.connect()
with pytest.raises(SuperDocError) as exc_info:
await transport.invoke(_TEST_OP, {'query': 'big'})
assert exc_info.value.code == HOST_DISCONNECTED
await transport.dispose()
finally:
_cleanup_wrapper(cli)

Loading