Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/claude_agent_sdk/_internal/transport/subprocess_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

logger = logging.getLogger(__name__)

_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit
_DEFAULT_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit


class SubprocessCLITransport(Transport):
Expand All @@ -48,6 +48,11 @@ def __init__(
self._stderr_task_group: anyio.abc.TaskGroup | None = None
self._ready = False
self._exit_error: Exception | None = None # Track process exit errors
self._max_buffer_size = (
options.max_buffer_size
if options.max_buffer_size is not None
else _DEFAULT_MAX_BUFFER_SIZE
)

def _find_cli(self) -> str:
"""Find Claude Code CLI binary."""
Expand Down Expand Up @@ -411,12 +416,13 @@ async def _read_messages_impl(self) -> AsyncIterator[dict[str, Any]]:
# Keep accumulating partial JSON until we can parse it
json_buffer += json_line

if len(json_buffer) > _MAX_BUFFER_SIZE:
if len(json_buffer) > self._max_buffer_size:
buffer_length = len(json_buffer)
json_buffer = ""
raise SDKJSONDecodeError(
f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes",
f"JSON message exceeded maximum buffer size of {self._max_buffer_size} bytes",
ValueError(
f"Buffer size {len(json_buffer)} exceeds limit {_MAX_BUFFER_SIZE}"
f"Buffer size {buffer_length} exceeds limit {self._max_buffer_size}"
),
)

Expand All @@ -427,7 +433,7 @@ async def _read_messages_impl(self) -> AsyncIterator[dict[str, Any]]:
except json.JSONDecodeError:
# We are speculatively decoding the buffer until we get
# a full JSON object. If there is an actual issue, we
# raise an error after _MAX_BUFFER_SIZE.
# raise an error after exceeding the configured limit.
continue

except anyio.ClosedResourceError:
Expand Down
1 change: 1 addition & 0 deletions src/claude_agent_sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ class ClaudeAgentOptions:
extra_args: dict[str, str | None] = field(
default_factory=dict
) # Pass arbitrary CLI flags
max_buffer_size: int | None = None # Max bytes when buffering CLI stdout
debug_stderr: Any = (
sys.stderr
) # Deprecated: File-like object for debug output. Use stderr callback instead.
Expand Down
32 changes: 30 additions & 2 deletions tests/test_subprocess_buffering.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from claude_agent_sdk._errors import CLIJSONDecodeError
from claude_agent_sdk._internal.transport.subprocess_cli import (
_MAX_BUFFER_SIZE,
_DEFAULT_MAX_BUFFER_SIZE,
SubprocessCLITransport,
)
from claude_agent_sdk.types import ClaudeAgentOptions
Expand Down Expand Up @@ -237,7 +237,7 @@ def test_buffer_size_exceeded(self) -> None:
"""Test that exceeding buffer size raises an appropriate error."""

async def _test() -> None:
huge_incomplete = '{"data": "' + "x" * (_MAX_BUFFER_SIZE + 1000)
huge_incomplete = '{"data": "' + "x" * (_DEFAULT_MAX_BUFFER_SIZE + 1000)

transport = SubprocessCLITransport(
prompt="test", options=ClaudeAgentOptions(), cli_path="/usr/bin/claude"
Expand All @@ -260,6 +260,34 @@ async def _test() -> None:

anyio.run(_test)

def test_buffer_size_option(self) -> None:
"""Test that the configurable buffer size option is respected."""

async def _test() -> None:
custom_limit = 512
huge_incomplete = '{"data": "' + "x" * (custom_limit + 10)

transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(max_buffer_size=custom_limit),
cli_path="/usr/bin/claude",
)

mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([huge_incomplete])
transport._stderr_stream = MockTextReceiveStream([])

with pytest.raises(CLIJSONDecodeError) as exc_info:
async for _ in transport.read_messages():
pass

assert f"maximum buffer size of {custom_limit} bytes" in str(exc_info.value)

anyio.run(_test)

def test_mixed_complete_and_split_json(self) -> None:
"""Test handling a mix of complete and split JSON messages."""

Expand Down
Loading