Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions src/claude_code_sdk/_internal/transport/subprocess_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from . import Transport


_MAX_BUFFER_SIZE = 1 * 1024 * 1024 # 1MB

class SubprocessCLITransport(Transport):
"""Subprocess transport using Claude Code CLI."""

Expand Down Expand Up @@ -182,6 +184,8 @@ async def read_stderr() -> None:
async with anyio.create_task_group() as tg:
tg.start_soon(read_stderr)

json_buffer = ""

try:
async for line in self._stdout_stream:
line_str = line.strip()
Expand All @@ -195,16 +199,23 @@ async def read_stderr() -> None:
json_line = json_line.strip()
if not json_line:
continue


json_buffer += json_line
if(len(json_buffer.encode(self._stdout_stream.encoding)) > _MAX_BUFFER_SIZE):
raise OverflowError(f"JSON message buffer exceeded max size: "
f"{_MAX_BUFFER_SIZE/(1024*1024)} MB")

try:
data = json.loads(json_line)
data = json.loads(json_buffer)
json_buffer = ""
try:
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return
except json.JSONDecodeError as e:
if json_line.startswith("{") or json_line.startswith("["):
if (json_buffer.startswith("{") and json_buffer.endswith("}")) or (
json_buffer.startswith("[") and json_buffer.endswith("]")):
raise SDKJSONDecodeError(json_line, e) from e
continue

Expand Down
38 changes: 38 additions & 0 deletions tests/test_subprocess_buffering.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class MockTextReceiveStream:
def __init__(self, lines: list[str]) -> None:
self.lines = lines
self.index = 0
self.encoding = "utf-8"

def __aiter__(self) -> AsyncIterator[str]:
return self
Expand Down Expand Up @@ -139,3 +140,40 @@ async def _test() -> None:
assert messages[1]["id"] == "res1"

anyio.run(_test)

def test_objects_over_buffer_limit(self) -> None:
"""Test parsing with a json object larger than the buffer limit."""

async def _test() -> None:
max_buffer_size = 2**16 # 64KB, default buffer limit for asyncio streams

# Simulate json object larger than max buffer size
json_obj1 = {"type": "message", "message": "a"* (2 ** 16), "id": "msg1"}
json_obj2 = {"type": "result", "id": "res1"}

buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)

# Lines larger than 64KB are split
buffered_lines = [buffered_line[i:i+max_buffer_size]
for i in range(0, len(buffered_line), max_buffer_size)]

transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)

mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(buffered_lines)
transport._stderr_stream = MockTextReceiveStream([])

messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)

assert len(messages) == 2
assert messages[0]["id"] == "msg1"
assert messages[1]["id"] == "res1"

anyio.run(_test)