1515// specific language governing permissions and limitations
1616// under the License.
1717
18- //! A transparent wrapper around a scan ExecutionPlan that logs memory stats
19- //! when the scan stream reaches EOF. Uses jemalloc stats (when available)
20- //! to track how much memory was allocated during the scan phase.
18+ //! A transparent wrapper around a scan ExecutionPlan that tracks per-partition
19+ //! memory allocation using jemalloc's per-thread counters.
2120//!
22- //! Captures three jemalloc snapshots:
23- //! 1. Before `child.execute()` — baseline
24- //! 2. After `child.execute()` — cost of stream setup (file opens, metadata reads)
25- //! 3. On stream EOF — total cost of the entire scan including all data reads
26- //!
27- //! Note: jemalloc `allocated` is process-wide, so concurrent partitions will
28- //! see each other's allocations. The deltas are approximate but still useful
29- //! for identifying which partitions drive memory growth.
21+ //! Uses `thread.allocatedp` / `thread.deallocatedp` which are thread-local
22+ //! monotonic counters. By measuring deltas around each `poll_next()` call,
23+ //! we get accurate per-partition allocation/deallocation numbers even with
24+ //! concurrent partitions on different threads.
3025
3126use arrow:: array:: RecordBatch ;
3227use arrow:: datatypes:: SchemaRef ;
@@ -42,36 +37,57 @@ use datafusion::physical_plan::{
4237use futures:: Stream ;
4338use std:: any:: Any ;
4439use std:: pin:: Pin ;
45- use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
40+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
4641use std:: sync:: Arc ;
4742use std:: task:: { Context , Poll } ;
4843
4944/// Global counters to aggregate stats across all partitions.
50- /// These are process-wide — multiple concurrent queries will aggregate together.
51- static TOTAL_DELTA_SUM : AtomicUsize = AtomicUsize :: new ( 0 ) ;
52- static MAX_PEAK : AtomicUsize = AtomicUsize :: new ( 0 ) ;
53- static COMPLETED_PARTITIONS : AtomicUsize = AtomicUsize :: new ( 0 ) ;
54- static TOTAL_ROWS_SUM : AtomicUsize = AtomicUsize :: new ( 0 ) ;
55-
56- /// Read jemalloc `stats::allocated` (bytes currently allocated by the application).
57- /// Returns 0 if jemalloc feature is not enabled.
45+ static AGG_THREAD_ALLOCATED : AtomicU64 = AtomicU64 :: new ( 0 ) ;
46+ static AGG_THREAD_DEALLOCATED : AtomicU64 = AtomicU64 :: new ( 0 ) ;
47+ static AGG_PEAK_PROCESS_ALLOCATED : AtomicU64 = AtomicU64 :: new ( 0 ) ;
48+ static AGG_COMPLETED_PARTITIONS : AtomicU64 = AtomicU64 :: new ( 0 ) ;
49+ static AGG_TOTAL_ROWS : AtomicU64 = AtomicU64 :: new ( 0 ) ;
50+
51+ /// Per-thread allocated/deallocated from jemalloc.
52+ /// These are monotonically increasing counters scoped to the calling thread.
53+ #[ cfg( feature = "jemalloc" ) ]
54+ fn jemalloc_thread_stats ( ) -> ( u64 , u64 ) {
55+ use tikv_jemalloc_ctl:: thread;
56+ let allocated = thread:: allocatedp:: read ( ) . map ( |p| p. get ( ) ) . unwrap_or ( 0 ) ;
57+ let deallocated = thread:: deallocatedp:: read ( ) . map ( |p| p. get ( ) ) . unwrap_or ( 0 ) ;
58+ ( allocated, deallocated)
59+ }
60+
61+ #[ cfg( not( feature = "jemalloc" ) ) ]
62+ fn jemalloc_thread_stats ( ) -> ( u64 , u64 ) {
63+ ( 0 , 0 )
64+ }
65+
66+ /// Process-wide jemalloc allocated bytes.
5867#[ cfg( feature = "jemalloc" ) ]
59- fn jemalloc_allocated ( ) -> usize {
68+ fn jemalloc_process_allocated ( ) -> u64 {
6069 use tikv_jemalloc_ctl:: { epoch, stats} ;
6170 epoch:: advance ( ) . ok ( ) ;
62- stats:: allocated:: read ( ) . unwrap_or ( 0 )
71+ stats:: allocated:: read ( ) . unwrap_or ( 0 ) as u64
6372}
6473
6574#[ cfg( not( feature = "jemalloc" ) ) ]
66- fn jemalloc_allocated ( ) -> usize {
75+ fn jemalloc_process_allocated ( ) -> u64 {
6776 0
6877}
6978
70- /// Wraps a child ExecutionPlan and logs memory stats when the child's stream
71- /// reaches EOF. Passes through all batches unchanged.
72- ///
73- /// `spark_partition` is the actual Spark partition index (not the DataFusion
74- /// partition, which is always 0 in Comet's per-partition execution model).
79+ fn update_atomic_max ( atomic : & AtomicU64 , value : u64 ) {
80+ let mut current = atomic. load ( Ordering :: Relaxed ) ;
81+ while value > current {
82+ match atomic. compare_exchange_weak ( current, value, Ordering :: Relaxed , Ordering :: Relaxed ) {
83+ Ok ( _) => break ,
84+ Err ( actual) => current = actual,
85+ }
86+ }
87+ }
88+
89+ /// Wraps a child ExecutionPlan and tracks memory allocated/deallocated
90+ /// during scan execution using jemalloc per-thread counters.
7591#[ derive( Debug ) ]
7692pub struct ScanMemoryLogExec {
7793 child : Arc < dyn ExecutionPlan > ,
@@ -140,20 +156,35 @@ impl ExecutionPlan for ScanMemoryLogExec {
140156 partition : usize ,
141157 context : Arc < TaskContext > ,
142158 ) -> DataFusionResult < SendableRecordBatchStream > {
143- // Snapshot before child. execute() — baseline
144- let allocated_before_execute = jemalloc_allocated ( ) ;
159+ // Capture thread-local counters before execute()
160+ let ( thread_alloc_before , thread_dealloc_before ) = jemalloc_thread_stats ( ) ;
145161
146162 let child_stream = self . child . execute ( partition, Arc :: clone ( & context) ) ?;
147163
164+ // Capture after execute() to measure setup cost
165+ let ( thread_alloc_after, thread_dealloc_after) = jemalloc_thread_stats ( ) ;
166+ let execute_allocated = thread_alloc_after - thread_alloc_before;
167+ let execute_deallocated = thread_dealloc_after - thread_dealloc_before;
168+
169+ log:: info!(
170+ "ScanMemoryLogExec spark_partition={}: execute() setup: \
171+ thread_allocated={}, thread_deallocated={}, net={}",
172+ self . spark_partition,
173+ execute_allocated,
174+ execute_deallocated,
175+ execute_allocated as i64 - execute_deallocated as i64 ,
176+ ) ;
177+
148178 let baseline_metrics = BaselineMetrics :: new ( & self . metrics , partition) ;
149179 Ok ( Box :: pin ( ScanMemoryLogStream {
150180 child : child_stream,
151181 context,
152182 spark_partition : self . spark_partition ,
153183 logged : false ,
154184 baseline_metrics,
155- allocated_before_scan : allocated_before_execute,
156- peak_allocated : allocated_before_execute,
185+ // Accumulate total thread-level alloc/dealloc across all polls
186+ thread_total_allocated : execute_allocated,
187+ thread_total_deallocated : execute_deallocated,
157188 batch_count : 0 ,
158189 total_rows : 0 ,
159190 } ) )
@@ -178,80 +209,74 @@ struct ScanMemoryLogStream {
178209 spark_partition : i32 ,
179210 logged : bool ,
180211 baseline_metrics : BaselineMetrics ,
181- /// jemalloc allocated bytes captured before child.execute()
182- allocated_before_scan : usize ,
183- /// High-water mark of jemalloc allocated bytes seen during polling
184- peak_allocated : usize ,
185- /// Number of batches consumed through this stream
212+ /// Accumulated bytes allocated on the thread during this scan
213+ thread_total_allocated : u64 ,
214+ /// Accumulated bytes deallocated on the thread during this scan
215+ thread_total_deallocated : u64 ,
186216 batch_count : usize ,
187- /// Total rows consumed through this stream
188217 total_rows : usize ,
189218}
190219
191220impl Stream for ScanMemoryLogStream {
192221 type Item = DataFusionResult < RecordBatch > ;
193222
194223 fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
224+ // Snapshot thread counters before polling child
225+ let ( alloc_before, dealloc_before) = jemalloc_thread_stats ( ) ;
226+
195227 let result = Pin :: new ( & mut self . child ) . poll_next ( cx) ;
228+
229+ // Snapshot after — delta is what this poll allocated/freed
230+ let ( alloc_after, dealloc_after) = jemalloc_thread_stats ( ) ;
231+ self . thread_total_allocated += alloc_after - alloc_before;
232+ self . thread_total_deallocated += dealloc_after - dealloc_before;
233+
196234 match & result {
197235 Poll :: Ready ( Some ( Ok ( batch) ) ) => {
198236 self . batch_count += 1 ;
199237 self . total_rows += batch. num_rows ( ) ;
200238 self . baseline_metrics . record_output ( batch. num_rows ( ) ) ;
201- // Track peak allocation across all batches
202- let current = jemalloc_allocated ( ) ;
203- if current > self . peak_allocated {
204- self . peak_allocated = current;
205- }
206239 }
207240 Poll :: Ready ( None ) if !self . logged => {
208241 self . logged = true ;
209242 let pool = self . context . memory_pool ( ) ;
210- let allocated_at_eof = jemalloc_allocated ( ) ;
211- let total_delta =
212- allocated_at_eof. saturating_sub ( self . allocated_before_scan ) ;
213- let peak_delta =
214- self . peak_allocated . saturating_sub ( self . allocated_before_scan ) ;
215-
216- // Accumulate into global counters
217- TOTAL_DELTA_SUM . fetch_add ( total_delta, Ordering :: Relaxed ) ;
218- TOTAL_ROWS_SUM . fetch_add ( self . total_rows , Ordering :: Relaxed ) ;
219- // Update max peak (atomic CAS loop)
220- let mut current_max = MAX_PEAK . load ( Ordering :: Relaxed ) ;
221- while self . peak_allocated > current_max {
222- match MAX_PEAK . compare_exchange_weak (
223- current_max,
224- self . peak_allocated ,
225- Ordering :: Relaxed ,
226- Ordering :: Relaxed ,
227- ) {
228- Ok ( _) => break ,
229- Err ( actual) => current_max = actual,
230- }
231- }
232- let completed = COMPLETED_PARTITIONS . fetch_add ( 1 , Ordering :: Relaxed ) + 1 ;
243+ let process_allocated = jemalloc_process_allocated ( ) ;
244+ let net = self . thread_total_allocated as i64
245+ - self . thread_total_deallocated as i64 ;
246+
247+ // Accumulate into global aggregates
248+ AGG_THREAD_ALLOCATED
249+ . fetch_add ( self . thread_total_allocated , Ordering :: Relaxed ) ;
250+ AGG_THREAD_DEALLOCATED
251+ . fetch_add ( self . thread_total_deallocated , Ordering :: Relaxed ) ;
252+ update_atomic_max ( & AGG_PEAK_PROCESS_ALLOCATED , process_allocated) ;
253+ AGG_TOTAL_ROWS . fetch_add ( self . total_rows as u64 , Ordering :: Relaxed ) ;
254+ let completed =
255+ AGG_COMPLETED_PARTITIONS . fetch_add ( 1 , Ordering :: Relaxed ) + 1 ;
233256
234257 log:: info!(
235258 "ScanMemoryLogExec spark_partition={}: scan complete, \
236259 batches={}, rows={}, \
237260 memory_pool_reserved={}, \
238- jemalloc_allocated: before ={}, at_eof ={}, peak ={}, \
239- total_delta={}, peak_delta ={} | \
240- aggregate: completed_partitions ={}, sum_total_delta ={}, \
241- max_peak ={}, sum_rows ={}",
261+ thread: allocated ={}, deallocated ={}, net ={}, \
262+ process_allocated ={} | \
263+ aggregate(n={}): thread_allocated ={}, thread_deallocated ={}, \
264+ thread_net ={}, max_process_allocated={}, total_rows ={}",
242265 self . spark_partition,
243266 self . batch_count,
244267 self . total_rows,
245268 pool. reserved( ) ,
246- self . allocated_before_scan,
247- allocated_at_eof,
248- self . peak_allocated,
249- total_delta,
250- peak_delta,
269+ self . thread_total_allocated,
270+ self . thread_total_deallocated,
271+ net,
272+ process_allocated,
251273 completed,
252- TOTAL_DELTA_SUM . load( Ordering :: Relaxed ) ,
253- MAX_PEAK . load( Ordering :: Relaxed ) ,
254- TOTAL_ROWS_SUM . load( Ordering :: Relaxed ) ,
274+ AGG_THREAD_ALLOCATED . load( Ordering :: Relaxed ) ,
275+ AGG_THREAD_DEALLOCATED . load( Ordering :: Relaxed ) ,
276+ AGG_THREAD_ALLOCATED . load( Ordering :: Relaxed ) as i64
277+ - AGG_THREAD_DEALLOCATED . load( Ordering :: Relaxed ) as i64 ,
278+ AGG_PEAK_PROCESS_ALLOCATED . load( Ordering :: Relaxed ) ,
279+ AGG_TOTAL_ROWS . load( Ordering :: Relaxed ) ,
255280 ) ;
256281 }
257282 _ => { }
0 commit comments