Skip to content

Commit fa7e79e

Browse files
committed
feat: add read throughput micro-benchmark for ArrowScan configurations
1 parent c383049 commit fa7e79e

File tree

1 file changed

+181
-0
lines changed

1 file changed

+181
-0
lines changed
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""Read throughput micro-benchmark for ArrowScan configurations.
18+
19+
Measures records/sec and peak Arrow memory across streaming, concurrent_files,
20+
and batch_size configurations introduced for issue #3036.
21+
22+
Memory is measured using pa.total_allocated_bytes() which tracks PyArrow's C++
23+
memory pool (Arrow buffers, Parquet decompression), not Python heap allocations.
24+
25+
Run with: uv run pytest tests/benchmark/test_read_benchmark.py -v -s -m benchmark
26+
"""
27+
28+
import gc
29+
import statistics
30+
import timeit
31+
from datetime import datetime, timezone
32+
33+
import pyarrow as pa
34+
import pyarrow.parquet as pq
35+
import pytest
36+
37+
from pyiceberg.catalog.sql import SqlCatalog
38+
39+
NUM_FILES = 32
40+
ROWS_PER_FILE = 500_000
41+
TOTAL_ROWS = NUM_FILES * ROWS_PER_FILE
42+
NUM_RUNS = 3
43+
44+
45+
def _generate_parquet_file(path: str, num_rows: int, seed: int) -> pa.Schema:
46+
"""Write a synthetic Parquet file and return its schema."""
47+
table = pa.table(
48+
{
49+
"id": pa.array(range(seed, seed + num_rows), type=pa.int64()),
50+
"value": pa.array([float(i) * 0.1 for i in range(num_rows)], type=pa.float64()),
51+
"label": pa.array([f"row_{i}" for i in range(num_rows)], type=pa.string()),
52+
"flag": pa.array([i % 2 == 0 for i in range(num_rows)], type=pa.bool_()),
53+
"ts": pa.array([datetime.now(timezone.utc)] * num_rows, type=pa.timestamp("us", tz="UTC")),
54+
}
55+
)
56+
pq.write_table(table, path)
57+
return table.schema
58+
59+
60+
@pytest.fixture(scope="session")
61+
def benchmark_table(tmp_path_factory: pytest.TempPathFactory) -> "pyiceberg.table.Table": # noqa: F821
62+
"""Create a catalog and table with synthetic Parquet files for benchmarking."""
63+
warehouse_path = str(tmp_path_factory.mktemp("benchmark_warehouse"))
64+
catalog = SqlCatalog(
65+
"benchmark",
66+
uri=f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
67+
warehouse=f"file://{warehouse_path}",
68+
)
69+
catalog.create_namespace("default")
70+
71+
# Generate files and append to table
72+
table = None
73+
for i in range(NUM_FILES):
74+
file_path = f"{warehouse_path}/data_{i}.parquet"
75+
_generate_parquet_file(file_path, ROWS_PER_FILE, seed=i * ROWS_PER_FILE)
76+
77+
file_table = pq.read_table(file_path)
78+
if table is None:
79+
table = catalog.create_table("default.benchmark_read", schema=file_table.schema)
80+
table.append(file_table)
81+
82+
return table
83+
84+
85+
def _measure_peak_arrow_memory(benchmark_table, batch_size, streaming, concurrent_files):
86+
"""Run a scan and track peak PyArrow C++ memory allocation."""
87+
gc.collect()
88+
pa.default_memory_pool().release_unused()
89+
baseline = pa.total_allocated_bytes()
90+
peak = baseline
91+
92+
total_rows = 0
93+
for batch in benchmark_table.scan().to_arrow_batch_reader(
94+
batch_size=batch_size,
95+
streaming=streaming,
96+
concurrent_files=concurrent_files,
97+
):
98+
total_rows += len(batch)
99+
current = pa.total_allocated_bytes()
100+
if current > peak:
101+
peak = current
102+
# Release the batch immediately to simulate a streaming consumer
103+
del batch
104+
105+
return total_rows, peak - baseline
106+
107+
108+
@pytest.mark.benchmark
109+
@pytest.mark.parametrize(
110+
"streaming,concurrent_files,batch_size",
111+
[
112+
pytest.param(False, 1, None, id="default"),
113+
pytest.param(True, 1, None, id="streaming-cf1"),
114+
pytest.param(True, 2, None, id="streaming-cf2"),
115+
pytest.param(True, 4, None, id="streaming-cf4"),
116+
pytest.param(True, 8, None, id="streaming-cf8"),
117+
pytest.param(True, 16, None, id="streaming-cf16"),
118+
],
119+
)
120+
def test_read_throughput(
121+
benchmark_table: "pyiceberg.table.Table", # noqa: F821
122+
streaming: bool,
123+
concurrent_files: int,
124+
batch_size: int | None,
125+
) -> None:
126+
"""Measure records/sec and peak Arrow memory for a scan configuration."""
127+
effective_batch_size = batch_size or 131_072 # PyArrow default
128+
if streaming:
129+
config_str = f"streaming=True, concurrent_files={concurrent_files}, batch_size={effective_batch_size}"
130+
else:
131+
config_str = f"streaming=False (executor.map, all files parallel), batch_size={effective_batch_size}"
132+
print(f"\n--- ArrowScan Read Throughput Benchmark ---")
133+
print(f"Config: {config_str}")
134+
print(f" Files: {NUM_FILES}, Rows per file: {ROWS_PER_FILE}, Total rows: {TOTAL_ROWS}")
135+
136+
elapsed_times: list[float] = []
137+
throughputs: list[float] = []
138+
peak_memories: list[int] = []
139+
140+
for run in range(NUM_RUNS):
141+
# Measure throughput
142+
gc.collect()
143+
pa.default_memory_pool().release_unused()
144+
baseline_mem = pa.total_allocated_bytes()
145+
peak_mem = baseline_mem
146+
147+
start = timeit.default_timer()
148+
total_rows = 0
149+
for batch in benchmark_table.scan().to_arrow_batch_reader(
150+
batch_size=batch_size,
151+
streaming=streaming,
152+
concurrent_files=concurrent_files,
153+
):
154+
total_rows += len(batch)
155+
current_mem = pa.total_allocated_bytes()
156+
if current_mem > peak_mem:
157+
peak_mem = current_mem
158+
elapsed = timeit.default_timer() - start
159+
160+
peak_above_baseline = peak_mem - baseline_mem
161+
rows_per_sec = total_rows / elapsed if elapsed > 0 else 0
162+
elapsed_times.append(elapsed)
163+
throughputs.append(rows_per_sec)
164+
peak_memories.append(peak_above_baseline)
165+
166+
print(
167+
f" Run {run + 1}: {elapsed:.2f}s, {rows_per_sec:,.0f} rows/s, "
168+
f"peak arrow mem: {peak_above_baseline / (1024 * 1024):.1f} MB"
169+
)
170+
171+
assert total_rows == TOTAL_ROWS, f"Expected {TOTAL_ROWS} rows, got {total_rows}"
172+
173+
mean_elapsed = statistics.mean(elapsed_times)
174+
stdev_elapsed = statistics.stdev(elapsed_times) if len(elapsed_times) > 1 else 0.0
175+
mean_throughput = statistics.mean(throughputs)
176+
mean_peak_mem = statistics.mean(peak_memories)
177+
178+
print(
179+
f" Mean: {mean_elapsed:.2f}s ± {stdev_elapsed:.2f}s, {mean_throughput:,.0f} rows/s, "
180+
f"peak arrow mem: {mean_peak_mem / (1024 * 1024):.1f} MB"
181+
)

0 commit comments

Comments
 (0)