diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index d403ceddb..3ab780392 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -18,6 +18,8 @@ from . import Transport +_MAX_BUFFER_SIZE = 1 * 1024 * 1024 # 1MB + class SubprocessCLITransport(Transport): """Subprocess transport using Claude Code CLI.""" @@ -182,6 +184,8 @@ async def read_stderr() -> None: async with anyio.create_task_group() as tg: tg.start_soon(read_stderr) + json_buffer = "" + try: async for line in self._stdout_stream: line_str = line.strip() @@ -195,16 +199,23 @@ async def read_stderr() -> None: json_line = json_line.strip() if not json_line: continue - + + json_buffer += json_line + if(len(json_buffer.encode(self._stdout_stream.encoding)) > _MAX_BUFFER_SIZE): + raise OverflowError(f"JSON message buffer exceeded max size: " + f"{_MAX_BUFFER_SIZE/(1024*1024)} MB") + try: - data = json.loads(json_line) + data = json.loads(json_buffer) + json_buffer = "" try: yield data except GeneratorExit: # Handle generator cleanup gracefully return except json.JSONDecodeError as e: - if json_line.startswith("{") or json_line.startswith("["): + if (json_buffer.startswith("{") and json_buffer.endswith("}")) or ( + json_buffer.startswith("[") and json_buffer.endswith("]")): raise SDKJSONDecodeError(json_line, e) from e continue diff --git a/tests/test_subprocess_buffering.py b/tests/test_subprocess_buffering.py index 06886df28..133378961 100644 --- a/tests/test_subprocess_buffering.py +++ b/tests/test_subprocess_buffering.py @@ -17,6 +17,7 @@ class MockTextReceiveStream: def __init__(self, lines: list[str]) -> None: self.lines = lines self.index = 0 + self.encoding = "utf-8" def __aiter__(self) -> AsyncIterator[str]: return self @@ -139,3 +140,40 @@ async def _test() -> None: assert messages[1]["id"] == "res1" anyio.run(_test) + + def test_objects_over_buffer_limit(self) -> None: + """Test parsing with a json object larger than the buffer limit.""" + + async def _test() -> None: + max_buffer_size = 2**16 # 64KB, default buffer limit for asyncio streams + + # Simulate json object larger than max buffer size + json_obj1 = {"type": "message", "message": "a"* (2 ** 16), "id": "msg1"} + json_obj2 = {"type": "result", "id": "res1"} + + buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2) + + # Lines larger than 64KB are split + buffered_lines = [buffered_line[i:i+max_buffer_size] + for i in range(0, len(buffered_line), max_buffer_size)] + + transport = SubprocessCLITransport( + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream(buffered_lines) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + assert len(messages) == 2 + assert messages[0]["id"] == "msg1" + assert messages[1]["id"] == "res1" + + anyio.run(_test)