@@ -86,33 +86,28 @@ def benchmark_table(tmp_path_factory: pytest.TempPathFactory) -> Table:
8686
8787@pytest .mark .benchmark
8888@pytest .mark .parametrize (
89- "streaming,concurrent_files,batch_size,max_workers " ,
89+ "streaming,concurrent_files,batch_size" ,
9090 [
91- pytest .param (False , 1 , None , None , id = "default" ),
92- pytest .param (False , 1 , None , 4 , id = "default-4threads" ),
93- pytest .param (True , 1 , None , None , id = "streaming-cf1" ),
94- pytest .param (True , 2 , None , None , id = "streaming-cf2" ),
95- pytest .param (True , 4 , None , None , id = "streaming-cf4" ),
96- pytest .param (True , 8 , None , None , id = "streaming-cf8" ),
97- pytest .param (True , 16 , None , None , id = "streaming-cf16" ),
91+ pytest .param (False , 1 , None , id = "default" ),
92+ pytest .param (True , 1 , None , id = "streaming-cf1" ),
93+ pytest .param (True , 2 , None , id = "streaming-cf2" ),
94+ pytest .param (True , 4 , None , id = "streaming-cf4" ),
95+ pytest .param (True , 8 , None , id = "streaming-cf8" ),
96+ pytest .param (True , 16 , None , id = "streaming-cf16" ),
9897 ],
9998)
10099def test_read_throughput (
101100 benchmark_table : Table ,
102101 streaming : bool ,
103102 concurrent_files : int ,
104103 batch_size : int | None ,
105- max_workers : int | None ,
106104) -> None :
107105 """Measure records/sec, time to first record, and peak Arrow memory for a scan configuration."""
108- from pyiceberg .utils .concurrent import ExecutorFactory
109-
110106 effective_batch_size = batch_size or 131_072 # PyArrow default
111107 if streaming :
112108 config_str = f"streaming=True, concurrent_files={ concurrent_files } , batch_size={ effective_batch_size } "
113109 else :
114- workers_str = f", max_workers={ max_workers } " if max_workers else ""
115- config_str = f"streaming=False (executor.map, all files parallel), batch_size={ effective_batch_size } { workers_str } "
110+ config_str = f"streaming=False (executor.map, all files parallel), batch_size={ effective_batch_size } "
116111 print ("\n --- ArrowScan Read Throughput Benchmark ---" )
117112 print (f"Config: { config_str } " )
118113 print (f" Files: { NUM_FILES } , Rows per file: { ROWS_PER_FILE } , Total rows: { TOTAL_ROWS } " )
@@ -122,55 +117,43 @@ def test_read_throughput(
122117 peak_memories : list [int ] = []
123118 ttfr_times : list [float ] = []
124119
125- # Override max_workers if specified
126- original_instance = None
127- if max_workers is not None :
128- from concurrent .futures import ThreadPoolExecutor
129-
130- original_instance = ExecutorFactory ._instance
131- ExecutorFactory ._instance = ThreadPoolExecutor (max_workers = max_workers )
132-
133- try :
134- for run in range (NUM_RUNS ):
135- # Measure throughput
136- gc .collect ()
137- pa .default_memory_pool ().release_unused ()
138- baseline_mem = pa .total_allocated_bytes ()
139- peak_mem = baseline_mem
140-
141- start = timeit .default_timer ()
142- total_rows = 0
143- first_batch_time = None
144- for batch in benchmark_table .scan ().to_arrow_batch_reader (
145- batch_size = batch_size ,
146- streaming = streaming ,
147- concurrent_files = concurrent_files ,
148- ):
149- if first_batch_time is None :
150- first_batch_time = timeit .default_timer () - start
151- total_rows += len (batch )
152- current_mem = pa .total_allocated_bytes ()
153- if current_mem > peak_mem :
154- peak_mem = current_mem
155- elapsed = timeit .default_timer () - start
156-
157- peak_above_baseline = peak_mem - baseline_mem
158- rows_per_sec = total_rows / elapsed if elapsed > 0 else 0
159- elapsed_times .append (elapsed )
160- throughputs .append (rows_per_sec )
161- peak_memories .append (peak_above_baseline )
162- ttfr_times .append (first_batch_time or 0.0 )
163-
164- print (
165- f" Run { run + 1 } : { elapsed :.2f} s, { rows_per_sec :,.0f} rows/s, "
166- f"TTFR: { (first_batch_time or 0 ) * 1000 :.1f} ms, "
167- f"peak arrow mem: { peak_above_baseline / (1024 * 1024 ):.1f} MB"
168- )
169-
170- assert total_rows == TOTAL_ROWS , f"Expected { TOTAL_ROWS } rows, got { total_rows } "
171- finally :
172- if original_instance is not None :
173- ExecutorFactory ._instance = original_instance
120+ for run in range (NUM_RUNS ):
121+ # Measure throughput
122+ gc .collect ()
123+ pa .default_memory_pool ().release_unused ()
124+ baseline_mem = pa .total_allocated_bytes ()
125+ peak_mem = baseline_mem
126+
127+ start = timeit .default_timer ()
128+ total_rows = 0
129+ first_batch_time = None
130+ for batch in benchmark_table .scan ().to_arrow_batch_reader (
131+ batch_size = batch_size ,
132+ streaming = streaming ,
133+ concurrent_files = concurrent_files ,
134+ ):
135+ if first_batch_time is None :
136+ first_batch_time = timeit .default_timer () - start
137+ total_rows += len (batch )
138+ current_mem = pa .total_allocated_bytes ()
139+ if current_mem > peak_mem :
140+ peak_mem = current_mem
141+ elapsed = timeit .default_timer () - start
142+
143+ peak_above_baseline = peak_mem - baseline_mem
144+ rows_per_sec = total_rows / elapsed if elapsed > 0 else 0
145+ elapsed_times .append (elapsed )
146+ throughputs .append (rows_per_sec )
147+ peak_memories .append (peak_above_baseline )
148+ ttfr_times .append (first_batch_time or 0.0 )
149+
150+ print (
151+ f" Run { run + 1 } : { elapsed :.2f} s, { rows_per_sec :,.0f} rows/s, "
152+ f"TTFR: { (first_batch_time or 0 ) * 1000 :.1f} ms, "
153+ f"peak arrow mem: { peak_above_baseline / (1024 * 1024 ):.1f} MB"
154+ )
155+
156+ assert total_rows == TOTAL_ROWS , f"Expected { TOTAL_ROWS } rows, got { total_rows } "
174157
175158 mean_elapsed = statistics .mean (elapsed_times )
176159 stdev_elapsed = statistics .stdev (elapsed_times ) if len (elapsed_times ) > 1 else 0.0
0 commit comments