Skip to content

Commit ebd3130

Browse files
committed
fix: raise ServiceUnavailable for unregistered read_id in _stream_multiplexer
1 parent a40a7ba commit ebd3130

File tree

2 files changed

+15
-6
lines changed

2 files changed

+15
-6
lines changed

packages/google-cloud-storage/google/cloud/storage/asyncio/_stream_multiplexer.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
from __future__ import annotations
1616

1717
import asyncio
18-
import grpc
1918
import logging
2019
from typing import Awaitable, Callable, Dict, Optional, Set
2120

21+
import grpc
22+
from google.api_core import exceptions
23+
2224
from google.cloud import _storage_v2
2325
from google.cloud.storage.asyncio.async_read_object_stream import (
2426
_AsyncReadObjectStream,
@@ -139,6 +141,10 @@ async def _recv_loop(self) -> None:
139141
queue = self._queues.get(read_id)
140142
if queue:
141143
queues_to_notify.add(queue)
144+
else:
145+
raise exceptions.ServiceUnavailable(
146+
f"Received data for unregistered read_id: {read_id}"
147+
)
142148
await asyncio.gather(
143149
*(
144150
self._put_with_timeout(queue, response)

packages/google-cloud-storage/tests/unit/asyncio/test_stream_multiplexer.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313
# limitations under the License.
1414

1515
import asyncio
16-
import grpc
1716
from unittest.mock import AsyncMock
1817

18+
import grpc
1919
import pytest
20+
from google.api_core import exceptions
2021

2122
from google.cloud import _storage_v2
2223
from google.cloud.storage.asyncio._stream_multiplexer import (
@@ -325,7 +326,7 @@ async def test_error_uses_put_nowait(self):
325326
assert err.exception is exc
326327

327328
@pytest.mark.asyncio
328-
async def test_unknown_read_id_is_dropped(self):
329+
async def test_unknown_read_id_raises_service_unavailable(self):
329330
# Given a multiplexer and a response with an unknown read ID
330331
mock_stream = AsyncMock()
331332
resp = _make_response(read_id=999)
@@ -337,9 +338,11 @@ async def test_unknown_read_id_is_dropped(self):
337338
# When the receive loop is started
338339
mux._ensure_recv_loop()
339340

340-
# Then the response is dropped and only StreamEnd is received
341-
end = await asyncio.wait_for(queue.get(), timeout=1)
342-
assert isinstance(end, _StreamEnd)
341+
# Then a StreamError is broadcast to all queues
342+
err = await asyncio.wait_for(queue.get(), timeout=1)
343+
assert isinstance(err, _StreamError)
344+
assert isinstance(err.exception, exceptions.ServiceUnavailable)
345+
assert "Received data for unregistered read_id: 999" in str(err.exception)
343346

344347

345348
class TestSend:

0 commit comments

Comments
 (0)