Skip to content

Commit 2096991

Browse files
authored
perf: add multiplexing performance tests for AsyncMultiRangeDownloader (#16501)
## Overview This PR introduces new microbenchmarks to measure and expose the performance bottleneck caused by lock contention in the `AsyncMultiRangeDownloader`. It provides a concrete way to compare the previous serialized implementation against the new multiplexed architecture. ## Before vs. After: The Performance Gap ### Before (Serialized via Lock) In the previous implementation, `download_ranges` used a shared lock to prevent concurrent access to the bidi-gRPC stream. This meant that even with multiple coroutines, only one could "own" the stream at a time. The entire download cycle (Send -> Receive All) had to complete before another task could start. **Execution Flow:** ```mermaid sequenceDiagram participant C1 as Coroutine 1 participant C2 as Coroutine 2 participant S as gRPC Stream C1->>C1: Acquire Lock C1->>S: Send Requests S-->>C1: Receive Data (Streaming...) S-->>C1: End of Range C1->>C1: Release Lock Note over C2: Waiting for Lock... C2->>C2: Acquire Lock C2->>S: Send Requests S-->>C2: Receive Data (Streaming...) S-->>C2: End of Range C2->>C2: Release Lock ``` ### After (Multiplexed Concurrent) With the introduction of the `_StreamMultiplexer`, multiple coroutines can now share the same stream concurrently. Requests are interleaved, and a background receiver loop routes incoming data to the correct task using `read_id`. **Execution Flow:** ```mermaid sequenceDiagram participant C1 as Coroutine 1 participant C2 as Coroutine 2 participant M as Multiplexer participant S as gRPC Stream C1->>M: Send Requests M->>S: Forward Req 1 C2->>M: Send Requests M->>S: Forward Req 2 Note over C1,C2: Tasks wait on their own queues S-->>M: Data for C1 M-->>C1: Route to Q1 S-->>M: Data for C2 M-->>C2: Route to Q2 S-->>M: Data for C1 M-->>C1: Route to Q1 ``` ## How the Benchmark Works This PR adds a `read_rand_multi_coro` workload that: 1. Spawns multiple asynchronous tasks (coroutines). 2. Shares a single `AsyncMultiRangeDownloader` instance across all tasks. 3. Simulates the old serialized behavior by explicitly passing a `shared_lock` to `download_ranges`. 4. Measures total throughput (MiB/s) and resource utilization. ## Key Changes - **`test_reads.py`**: Refactored to support launching concurrent coroutines within a single worker process. - **`config.yaml`**: Added `read_rand_multi_coro` with 1, 16 coroutines to stress the downloader. - **`config.py`**: Updated naming convention to include coroutine count (e.g., `16c`) in reports for easier differentiation.
1 parent d3d6840 commit 2096991

File tree

5 files changed

+45
-40
lines changed

5 files changed

+45
-40
lines changed

packages/google-cloud-storage/output.json

Whitespace-only changes.

packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@
1717
@pytest.fixture
1818
def workload_params(request):
1919
params = request.param
20-
files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_processes)]
20+
files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_files)]
2121
return params, files_names

packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,10 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]:
8080
chunk_size_bytes = chunk_size_kib * 1024
8181
bucket_name = bucket_map[bucket_type]
8282

83-
num_files = num_processes * num_coros
83+
num_files = num_processes
8484

8585
# Create a descriptive name for the parameter set
86-
name = f"{pattern}_{bucket_type}_{num_processes}p_{file_size_mib}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges"
86+
name = f"{pattern}_{bucket_type}_{num_processes}p_{num_coros}c_{file_size_mib}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges"
8787

8888
params[workload_name].append(
8989
TimeBasedReadParameters(

packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/config.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ workload:
2020

2121
- name: "read_rand_multi_process"
2222
pattern: "rand"
23-
coros: [1]
23+
coros: [1, 16]
2424
processes: [1]
2525

26+
2627
defaults:
2728
DEFAULT_RAPID_ZONAL_BUCKET: "chandrasiri-benchmarks-zb"
2829
DEFAULT_STANDARD_BUCKET: "chandrasiri-benchmarks-rb"

packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/test_reads.py

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -115,47 +115,51 @@ def _download_time_based_json(client, filename, params):
115115

116116

117117
async def _download_time_based_async(client, filename, params):
118-
total_bytes_downloaded = 0
119-
120118
mrd = AsyncMultiRangeDownloader(client, params.bucket_name, filename)
121119
await mrd.open()
122120

123-
offset = 0
124-
is_warming_up = True
125-
start_time = time.monotonic()
126-
warmup_end_time = start_time + params.warmup_duration
127-
test_end_time = warmup_end_time + params.duration
128-
129-
while time.monotonic() < test_end_time:
130-
current_time = time.monotonic()
131-
if is_warming_up and current_time >= warmup_end_time:
132-
is_warming_up = False
133-
total_bytes_downloaded = 0 # Reset counter after warmup
134-
135-
ranges = []
136-
if params.pattern == "rand":
137-
for _ in range(params.num_ranges):
138-
offset = random.randint(
139-
0, params.file_size_bytes - params.chunk_size_bytes
140-
)
141-
ranges.append((offset, params.chunk_size_bytes, BytesIO()))
142-
else: # seq
143-
for _ in range(params.num_ranges):
144-
ranges.append((offset, params.chunk_size_bytes, BytesIO()))
145-
offset += params.chunk_size_bytes
146-
if offset + params.chunk_size_bytes > params.file_size_bytes:
147-
offset = 0 # Reset offset if end of file is reached
148-
149-
await mrd.download_ranges(ranges)
150-
151-
bytes_in_buffers = sum(r[2].getbuffer().nbytes for r in ranges)
152-
assert bytes_in_buffers == params.chunk_size_bytes * params.num_ranges
153-
154-
if not is_warming_up:
155-
total_bytes_downloaded += params.chunk_size_bytes * params.num_ranges
121+
async def _worker_coro():
122+
total_bytes_downloaded = 0
123+
offset = 0
124+
is_warming_up = True
125+
start_time = time.monotonic()
126+
warmup_end_time = start_time + params.warmup_duration
127+
test_end_time = warmup_end_time + params.duration
128+
129+
while time.monotonic() < test_end_time:
130+
current_time = time.monotonic()
131+
if is_warming_up and current_time >= warmup_end_time:
132+
is_warming_up = False
133+
total_bytes_downloaded = 0 # Reset counter after warmup
134+
135+
ranges = []
136+
if params.pattern == "rand":
137+
for _ in range(params.num_ranges):
138+
offset = random.randint(
139+
0, params.file_size_bytes - params.chunk_size_bytes
140+
)
141+
ranges.append((offset, params.chunk_size_bytes, BytesIO()))
142+
else: # seq
143+
for _ in range(params.num_ranges):
144+
ranges.append((offset, params.chunk_size_bytes, BytesIO()))
145+
offset += params.chunk_size_bytes
146+
if offset + params.chunk_size_bytes > params.file_size_bytes:
147+
offset = 0 # Reset offset if end of file is reached
148+
149+
await mrd.download_ranges(ranges)
150+
151+
bytes_in_buffers = sum(r[2].getbuffer().nbytes for r in ranges)
152+
assert bytes_in_buffers == params.chunk_size_bytes * params.num_ranges
153+
154+
if not is_warming_up:
155+
total_bytes_downloaded += params.chunk_size_bytes * params.num_ranges
156+
return total_bytes_downloaded
157+
158+
tasks = [asyncio.create_task(_worker_coro()) for _ in range(params.num_coros)]
159+
results = await asyncio.gather(*tasks)
156160

157161
await mrd.close()
158-
return total_bytes_downloaded
162+
return sum(results)
159163

160164

161165
def _download_files_worker(process_idx, filename, params, bucket_type):

0 commit comments

Comments
 (0)