Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions src/claude_code_sdk/_internal/transport/subprocess_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,22 @@ async def read_stderr() -> None:
tg.start_soon(read_stderr)

try:
# store incomplete buffered json output
incomplete_json_line_str = None
async for line in self._stdout_stream:
line_str = line.strip()
if not line_str:
continue

if incomplete_json_line_str:
assert not line_str.startswith("{"), (
"New line must complete incomplete JSON"
)
line_str = incomplete_json_line_str + line_str

# Store parsed JSON output for the line
parsed_json_outputs = []

# Split on newlines in case multiple JSON objects are buffered together
json_lines = line_str.split("\n")

Expand All @@ -198,16 +209,31 @@ async def read_stderr() -> None:

try:
data = json.loads(json_line)
try:
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return

# line has been parsed, reseting incomplete_json_line_str
incomplete_json_line_str = None

# Yield later to avoid duplicates on incomplete line_str
parsed_json_outputs.append(data)
except json.JSONDecodeError as e:
if json_line.startswith("{") or json_line.startswith("["):
raise SDKJSONDecodeError(json_line, e) from e
incomplete_json_line_str = line_str

# raise error only if output is complete JSON but unable to parse
if json_line.endswith("}") or json_line.startswith("]"):
raise SDKJSONDecodeError(json_line, e) from e
continue

if incomplete_json_line_str:
continue

for json_output_data in parsed_json_outputs:
try:
yield json_output_data
except GeneratorExit:
# Handle generator cleanup gracefully
return

except anyio.ClosedResourceError:
pass

Expand Down
147 changes: 147 additions & 0 deletions tests/test_subprocess_buffering.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
from unittest.mock import AsyncMock, MagicMock

import anyio
import pytest

from claude_code_sdk._errors import CLIJSONDecodeError
from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
from claude_code_sdk.types import ClaudeCodeOptions

Expand Down Expand Up @@ -139,3 +141,148 @@ async def _test() -> None:
assert messages[1]["id"] == "res1"

anyio.run(_test)

def test_incomplete_json_across_multiple_lines(self) -> None:
"""Test parsing when JSON is split across multiple lines due to buffering."""

async def _test() -> None:
# Large JSON that gets split across multiple lines
json_obj = {
"type": "assistant",
"message": {
"content": [
{
"type": "text",
"text": "This is a very long response that might get split",
},
{
"type": "tool_use",
"id": "tool_123",
"name": "Read",
"input": {"file_path": "/very/long/path/to/file.py"},
},
]
},
}

complete_json = json.dumps(json_obj)

# Split the JSON at an arbitrary point to simulate buffering
split_point = len(complete_json) // 2
first_part = complete_json[:split_point]
second_part = complete_json[split_point:]

transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)

mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process

# Simulate receiving the JSON in two parts
transport._stdout_stream = MockTextReceiveStream([first_part, second_part])
transport._stderr_stream = MockTextReceiveStream([])

messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)

# Should parse as one complete message
assert len(messages) == 1
assert messages[0]["type"] == "assistant"
assert len(messages[0]["message"]["content"]) == 2

anyio.run(_test)

def test_malformed_complete_json_raises_error(self) -> None:
"""Test that malformed but seemingly complete JSON raises an error."""

async def _test() -> None:
# JSON that looks complete but is malformed
malformed_json = '{"type": "message", "invalid": unquoted_value}'

transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)

mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([malformed_json])
transport._stderr_stream = MockTextReceiveStream([])

# Should raise CLIJSONDecodeError for malformed complete JSON
# The exception will be wrapped in an ExceptionGroup due to anyio task group
with pytest.raises(Exception) as exc_info:
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)

# Verify the actual exception is CLIJSONDecodeError
assert len(exc_info.value.exceptions) == 1
assert isinstance(exc_info.value.exceptions[0], CLIJSONDecodeError)

anyio.run(_test)

def test_no_duplicate_output_when_incomplete_json_followed_by_valid(self) -> None:
"""Test that we don't duplicate output when incomplete JSON is followed by valid JSON."""

async def _test() -> None:
# First valid JSON
valid_json1 = {"type": "user", "message": "first message"}

# Second valid JSON
valid_json2 = {"type": "assistant", "message": "second message"}

# Large JSON that will be incomplete
large_json = {
"type": "result",
"data": "Very large data " * 200, # Make it large enough to split
"status": "completed",
}

valid_str1 = json.dumps(valid_json1)
valid_str2 = json.dumps(valid_json2)
large_str = json.dumps(large_json)

# Split the large JSON
split_point = len(large_str) // 2
large_part1 = large_str[:split_point]
large_part2 = large_str[split_point:]

# Create line that has: valid JSON + newline + valid JSON + newline + incomplete large JSON
combined_line = valid_str1 + "\n" + valid_str2 + "\n" + large_part1

lines = [
combined_line, # First line: 2 valid JSONs + start of large JSON
large_part2, # Second line: completion of large JSON
]

transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)

mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(lines)
transport._stderr_stream = MockTextReceiveStream([])

messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)

# Should have exactly 3 messages: 2 from first line + 1 completed large JSON
assert len(messages) == 3
assert messages[0]["type"] == "user"
assert messages[0]["message"] == "first message"
assert messages[1]["type"] == "assistant"
assert messages[1]["message"] == "second message"
assert messages[2]["type"] == "result"
assert messages[2]["status"] == "completed"

anyio.run(_test)