Skip to content

Commit 192cdce

Browse files
MarkDaoustcopybara-github
authored andcommitted
fix: pass max_line_length to readline() to prevent LineTooLong on large SSE lines with MTLS.
When using AsyncAuthorizedSession (Vertex AI + ADC), the google-auth library creates its own internal aiohttp.ClientSession with the default read_bufsize=2**16. This gives the underlying StreamReader a _high_water of 131072 bytes, which is the effective limit for readline(). Streaming responses from thinking models can include large thoughtSignature fields, or generated images, that push a single SSE data: line beyond 131072 bytes, causing aiohttp to raise LineTooLong. The fix passes max_line_length=READ_BUFFER_SIZE (4MB) explicitly to every readline() call in _aiter_response_stream(), overriding the limit regardless of which code path created the underlying session. This covers both the direct AiohttpClientSession path (API key auth) and the AsyncAuthorizedSession path (Vertex AI + ADC). Test: added test_aiohttp_large_sse_line_with_thought_signature which uses a mock that enforces the real aiohttp LineTooLong limit, and streams a 150KB SSE line that would previously fail. Manually verified, this test fails before the change. PiperOrigin-RevId: 915454389
1 parent d5a9527 commit 192cdce

2 files changed

Lines changed: 89 additions & 6 deletions

File tree

google/genai/_api_client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,9 @@ async def _aiter_response_stream(self) -> AsyncIterator[str]:
447447
try:
448448
while True:
449449
# Read a line from the stream. This returns bytes.
450-
line_bytes = await self.response_stream.content.readline()
450+
line_bytes = await self.response_stream.content.readline(
451+
max_line_length=READ_BUFFER_SIZE
452+
)
451453
if not line_bytes:
452454
break
453455
# Decode the bytes and remove trailing whitespace and newlines.

google/genai/tests/client/test_async_stream.py

Lines changed: 86 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def __init__(self, lines: List[str]):
7373
self.content.readline.side_effect = self._async_read_line
7474
self.release = MagicMock()
7575

76-
async def _async_read_line(self) -> bytes:
76+
async def _async_read_line(self, **kwargs) -> bytes:
7777
if self._read_pos >= len(self._read_data):
7878
return b"" # End of stream
7979

@@ -240,7 +240,7 @@ async def test_aiohttp_simple_lines(responses: api_client.HttpResponse):
240240
results = [line async for line in responses._aiter_response_stream()]
241241

242242
assert results == lines
243-
mock_response.content.readline.assert_any_call()
243+
mock_response.content.readline.assert_called()
244244
mock_response.release.assert_called_once()
245245

246246

@@ -256,7 +256,7 @@ async def test_aiohttp_data_prefix(responses: api_client.HttpResponse):
256256
results = [line async for line in responses._aiter_response_stream()]
257257

258258
assert results == ["{ 'message': 'hello' }", "{ 'status': 'ok' }"]
259-
mock_response.content.readline.assert_any_call()
259+
mock_response.content.readline.assert_called()
260260
mock_response.release.assert_called_once()
261261

262262

@@ -278,7 +278,7 @@ async def test_aiohttp_multiple_json_chunks(responses: api_client.HttpResponse):
278278
results = [line async for line in responses._aiter_response_stream()]
279279

280280
assert results == ['{ "id": 1 }', '{ "id": 2 }', '{ "id": 3 }']
281-
mock_response.content.readline.assert_any_call()
281+
mock_response.content.readline.assert_called()
282282
mock_response.release.assert_called_once()
283283

284284

@@ -296,7 +296,88 @@ async def test_aiohttp_incomplete_json_at_end(
296296
results = [line async for line in responses._aiter_response_stream()]
297297

298298
assert results == ['{ "partial": "data"']
299-
mock_response.content.readline.assert_any_call()
299+
mock_response.content.readline.assert_called()
300+
mock_response.release.assert_called_once()
301+
302+
303+
class MockAIOHTTPResponseWithLineLimits(aiohttp.ClientResponse):
304+
"""Mock that enforces aiohttp's real readline limits.
305+
306+
Real aiohttp StreamReader raises LineTooLong when a line exceeds
307+
`_high_water` (= limit * 2) bytes. The default limit is 2**16, so lines
308+
over 131072 bytes fail unless `max_line_length` is explicitly passed.
309+
"""
310+
311+
DEFAULT_HIGH_WATER = 2**16 * 2 # 131072, same as aiohttp default
312+
313+
def __init__(self, lines: List[str]):
314+
self.content = MagicMock()
315+
self.content.readline = AsyncMock()
316+
self._read_data = b"\n".join(line.encode("utf-8") for line in lines) + b"\n"
317+
self._read_pos = 0
318+
self.content.readline.side_effect = self._async_read_line
319+
self.release = MagicMock()
320+
321+
async def _async_read_line(
322+
self, *, max_line_length=None
323+
) -> bytes:
324+
if self._read_pos >= len(self._read_data):
325+
return b""
326+
327+
newline_pos = self._read_data.find(b"\n", self._read_pos)
328+
if newline_pos == -1:
329+
line = self._read_data[self._read_pos:]
330+
self._read_pos = len(self._read_data)
331+
else:
332+
line = self._read_data[self._read_pos:newline_pos + 1]
333+
self._read_pos = newline_pos + 1
334+
335+
# Enforce limit like real aiohttp StreamReader.readuntil does
336+
limit = max_line_length or self.DEFAULT_HIGH_WATER
337+
if len(line) > limit:
338+
from aiohttp.http_exceptions import LineTooLong
339+
raise LineTooLong(line[:100] + b"...", limit)
340+
341+
return line
342+
343+
344+
@requires_aiohttp
345+
@pytest.mark.asyncio
346+
async def test_aiohttp_large_sse_line_with_thought_signature(
347+
responses: api_client.HttpResponse,
348+
):
349+
"""Verifies large SSE lines (e.g. thoughtSignature) don't hit LineTooLong.
350+
351+
aiohttp's StreamReader.readline() enforces a maximum line length based on
352+
the session's read_bufsize (default: 2**16), which gives a _high_water limit
353+
of 131072 bytes. Thinking models can return a thoughtSignature field large
354+
enough to push a single SSE data: line past this limit, causing LineTooLong.
355+
356+
The fix passes max_line_length=READ_BUFFER_SIZE (4MB) explicitly on the
357+
readline() call in _aiter_response_stream(), overriding the limit at the
358+
call site regardless of how the underlying aiohttp session was configured.
359+
360+
This test verifies the fix by using a mock that enforces the real aiohttp
361+
readline limit and confirms a 150KB line is streamed successfully.
362+
"""
363+
api_client.has_aiohttp = True
364+
365+
# Build a single SSE line larger than aiohttp's default limit (131072)
366+
large_thought_sig = "A" * 150_000 # > 131072 bytes
367+
large_sse_payload = (
368+
f'{{"candidates": [{{"content": {{"parts": [{{"text": "",'
369+
f'"thoughtSignature": "{large_thought_sig}"}}]}}}}]}}'
370+
)
371+
lines = [f"data: {large_sse_payload}", ""]
372+
373+
mock_response = MockAIOHTTPResponseWithLineLimits(lines)
374+
responses.response_stream = mock_response
375+
376+
results = [line async for line in responses._aiter_response_stream()]
377+
378+
assert len(results) == 1
379+
assert "thoughtSignature" in results[0]
380+
assert large_thought_sig in results[0]
300381
mock_response.release.assert_called_once()
301382

302383

0 commit comments

Comments
 (0)