Skip to content

Commit 0a6ea84

Browse files
committed
feat: add _StreamMultiplexer skeleton with sentinel types
1 parent 14abfd5 commit 0a6ea84

File tree

2 files changed

+123
-0
lines changed

2 files changed

+123
-0
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
import asyncio
18+
import logging
19+
from typing import Awaitable, Callable, Dict, Optional, Set
20+
21+
from google.cloud import _storage_v2
22+
from google.cloud.storage.asyncio.async_read_object_stream import (
23+
_AsyncReadObjectStream,
24+
)
25+
26+
logger = logging.getLogger(__name__)
27+
28+
_DEFAULT_QUEUE_MAX_SIZE = 100
29+
30+
31+
class _StreamError:
32+
"""Wraps a retryable error with the stream generation that produced it."""
33+
34+
def __init__(self, exception: Exception, generation: int):
35+
self.exception = exception
36+
self.generation = generation
37+
38+
39+
class _StreamEnd:
40+
"""Signals the stream closed normally."""
41+
42+
pass
43+
44+
45+
class _StreamMultiplexer:
46+
"""Multiplexes concurrent download tasks over a single bidi-gRPC stream.
47+
48+
Routes responses from a background recv loop to per-task asyncio.Queues
49+
keyed by read_id. Serializes sends via lock. Coordinates stream reopening
50+
via generation-gated locking.
51+
52+
A slow consumer on one task will slow down the entire shared connection
53+
due to bounded queue backpressure propagating through gRPC flow control.
54+
"""
55+
56+
def __init__(
57+
self,
58+
stream: _AsyncReadObjectStream,
59+
queue_max_size: int = _DEFAULT_QUEUE_MAX_SIZE,
60+
):
61+
self._stream = stream
62+
self._stream_generation: int = 0
63+
self._queues: Dict[int, asyncio.Queue] = {}
64+
self._send_lock = asyncio.Lock()
65+
self._reopen_lock = asyncio.Lock()
66+
self._recv_task: Optional[asyncio.Task] = None
67+
self._queue_max_size = queue_max_size
68+
69+
@property
70+
def stream_generation(self) -> int:
71+
return self._stream_generation
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
import pytest
17+
from unittest.mock import AsyncMock, MagicMock
18+
19+
from google.cloud.storage.asyncio._stream_multiplexer import (
20+
_StreamMultiplexer,
21+
_StreamError,
22+
_StreamEnd,
23+
_DEFAULT_QUEUE_MAX_SIZE,
24+
)
25+
26+
27+
class TestSentinelTypes:
28+
def test_stream_error_stores_exception_and_generation(self):
29+
exc = ValueError("test")
30+
error = _StreamError(exc, generation=3)
31+
assert error.exception is exc
32+
assert error.generation == 3
33+
34+
def test_stream_end_is_instantiable(self):
35+
sentinel = _StreamEnd()
36+
assert isinstance(sentinel, _StreamEnd)
37+
38+
39+
class TestStreamMultiplexerInit:
40+
def test_init_sets_stream_and_defaults(self):
41+
mock_stream = AsyncMock()
42+
mux = _StreamMultiplexer(mock_stream)
43+
assert mux._stream is mock_stream
44+
assert mux.stream_generation == 0
45+
assert mux._queues == {}
46+
assert mux._recv_task is None
47+
assert mux._queue_max_size == _DEFAULT_QUEUE_MAX_SIZE
48+
49+
def test_init_custom_queue_size(self):
50+
mock_stream = AsyncMock()
51+
mux = _StreamMultiplexer(mock_stream, queue_max_size=50)
52+
assert mux._queue_max_size == 50

0 commit comments

Comments
 (0)