Skip to content

Commit a40a7ba

Browse files
committed
fix: update stream end condition to use grpc.aio.EOF in _stream_multiplexer
1 parent 9917f8d commit a40a7ba

File tree

2 files changed

+8
-6
lines changed

2 files changed

+8
-6
lines changed

packages/google-cloud-storage/google/cloud/storage/asyncio/_stream_multiplexer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from __future__ import annotations
1616

1717
import asyncio
18+
import grpc
1819
import logging
1920
from typing import Awaitable, Callable, Dict, Optional, Set
2021

@@ -121,7 +122,7 @@ async def _recv_loop(self) -> None:
121122
try:
122123
while True:
123124
response = await self._stream.recv()
124-
if response is None:
125+
if response == grpc.aio.EOF:
125126
sentinel = _StreamEnd()
126127
await asyncio.gather(
127128
*(

packages/google-cloud-storage/tests/unit/asyncio/test_stream_multiplexer.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import asyncio
16+
import grpc
1617
from unittest.mock import AsyncMock
1718

1819
import pytest
@@ -196,7 +197,7 @@ async def test_routes_response_by_read_id(self):
196197
mock_stream = AsyncMock()
197198
resp1 = _make_response(read_id=10, data=b"hello")
198199
resp2 = _make_response(read_id=20, data=b"world")
199-
mock_stream.recv = AsyncMock(side_effect=[resp1, resp2, None])
200+
mock_stream.recv = AsyncMock(side_effect=[resp1, resp2, grpc.aio.EOF])
200201

201202
mux = _StreamMultiplexer(mock_stream)
202203
q1 = mux.register({10})
@@ -222,7 +223,7 @@ async def test_deduplicates_when_multiple_read_ids_map_to_same_queue(self):
222223
# Given a multiplexer with multiple read IDs mapped to the same queue
223224
mock_stream = AsyncMock()
224225
resp = _make_multi_range_response([10, 11])
225-
mock_stream.recv = AsyncMock(side_effect=[resp, None])
226+
mock_stream.recv = AsyncMock(side_effect=[resp, grpc.aio.EOF])
226227

227228
mux = _StreamMultiplexer(mock_stream)
228229
queue = mux.register({10, 11})
@@ -244,7 +245,7 @@ async def test_metadata_only_response_broadcast_to_all(self):
244245
metadata_resp = _storage_v2.BidiReadObjectResponse(
245246
read_handle=_storage_v2.BidiReadHandle(handle=b"handle")
246247
)
247-
mock_stream.recv = AsyncMock(side_effect=[metadata_resp, None])
248+
mock_stream.recv = AsyncMock(side_effect=[metadata_resp, grpc.aio.EOF])
248249

249250
mux = _StreamMultiplexer(mock_stream)
250251
q1 = mux.register({10})
@@ -263,7 +264,7 @@ async def test_metadata_only_response_broadcast_to_all(self):
263264
async def test_stream_end_sends_sentinel_to_all_queues(self):
264265
# Given a multiplexer with multiple registered queues and a stream that ends immediately
265266
mock_stream = AsyncMock()
266-
mock_stream.recv = AsyncMock(return_value=None)
267+
mock_stream.recv = AsyncMock(return_value=grpc.aio.EOF)
267268

268269
mux = _StreamMultiplexer(mock_stream)
269270
q1 = mux.register({10})
@@ -328,7 +329,7 @@ async def test_unknown_read_id_is_dropped(self):
328329
# Given a multiplexer and a response with an unknown read ID
329330
mock_stream = AsyncMock()
330331
resp = _make_response(read_id=999)
331-
mock_stream.recv = AsyncMock(side_effect=[resp, None])
332+
mock_stream.recv = AsyncMock(side_effect=[resp, grpc.aio.EOF])
332333

333334
mux = _StreamMultiplexer(mock_stream)
334335
queue = mux.register({10})

0 commit comments

Comments
 (0)