Skip to content

Commit 78760a6

Browse files
committed
chore: Memory scan log exec experiments
1 parent e188c66 commit 78760a6

3 files changed

Lines changed: 272 additions & 0 deletions

File tree

native/core/src/execution/operators/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ pub use parquet_writer::ParquetWriterExec;
3434
mod csv_scan;
3535
pub mod projection;
3636
mod scan;
37+
mod scan_memory_log;
3738
pub use csv_scan::init_csv_datasource_exec;
39+
pub use scan_memory_log::ScanMemoryLogExec;
3840

3941
/// Error returned during executing operators.
4042
#[derive(thiserror::Error, Debug)]
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! A transparent wrapper around a scan ExecutionPlan that logs memory stats
19+
//! when the scan stream reaches EOF. Uses jemalloc stats (when available)
20+
//! to track how much memory was allocated during the scan phase.
21+
//!
22+
//! Captures three jemalloc snapshots:
23+
//! 1. Before `child.execute()` — baseline
24+
//! 2. After `child.execute()` — cost of stream setup (file opens, metadata reads)
25+
//! 3. On stream EOF — total cost of the entire scan including all data reads
26+
//!
27+
//! Note: jemalloc `allocated` is process-wide, so concurrent partitions will
28+
//! see each other's allocations. The deltas are approximate but still useful
29+
//! for identifying which partitions drive memory growth.
30+
31+
use arrow::array::RecordBatch;
32+
use arrow::datatypes::SchemaRef;
33+
use datafusion::common::Result as DataFusionResult;
34+
use datafusion::execution::TaskContext;
35+
use datafusion::physical_expr::EquivalenceProperties;
36+
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
37+
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
38+
use datafusion::physical_plan::{
39+
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
40+
RecordBatchStream, SendableRecordBatchStream,
41+
};
42+
use futures::Stream;
43+
use std::any::Any;
44+
use std::pin::Pin;
45+
use std::sync::atomic::{AtomicUsize, Ordering};
46+
use std::sync::Arc;
47+
use std::task::{Context, Poll};
48+
49+
/// Global counters to aggregate stats across all partitions.
50+
/// These are process-wide — multiple concurrent queries will aggregate together.
51+
static TOTAL_DELTA_SUM: AtomicUsize = AtomicUsize::new(0);
52+
static MAX_PEAK: AtomicUsize = AtomicUsize::new(0);
53+
static COMPLETED_PARTITIONS: AtomicUsize = AtomicUsize::new(0);
54+
static TOTAL_ROWS_SUM: AtomicUsize = AtomicUsize::new(0);
55+
56+
/// Read jemalloc `stats::allocated` (bytes currently allocated by the application).
57+
/// Returns 0 if jemalloc feature is not enabled.
58+
#[cfg(feature = "jemalloc")]
59+
fn jemalloc_allocated() -> usize {
60+
use tikv_jemalloc_ctl::{epoch, stats};
61+
epoch::advance().ok();
62+
stats::allocated::read().unwrap_or(0)
63+
}
64+
65+
#[cfg(not(feature = "jemalloc"))]
66+
fn jemalloc_allocated() -> usize {
67+
0
68+
}
69+
70+
/// Wraps a child ExecutionPlan and logs memory stats when the child's stream
71+
/// reaches EOF. Passes through all batches unchanged.
72+
///
73+
/// `spark_partition` is the actual Spark partition index (not the DataFusion
74+
/// partition, which is always 0 in Comet's per-partition execution model).
75+
#[derive(Debug)]
76+
pub struct ScanMemoryLogExec {
77+
child: Arc<dyn ExecutionPlan>,
78+
spark_partition: i32,
79+
cache: PlanProperties,
80+
metrics: ExecutionPlanMetricsSet,
81+
}
82+
83+
impl ScanMemoryLogExec {
84+
pub fn new(child: Arc<dyn ExecutionPlan>, spark_partition: i32) -> Self {
85+
let cache = PlanProperties::new(
86+
EquivalenceProperties::new(child.schema()),
87+
child.output_partitioning().clone(),
88+
EmissionType::Final,
89+
Boundedness::Bounded,
90+
);
91+
Self {
92+
child,
93+
spark_partition,
94+
cache,
95+
metrics: ExecutionPlanMetricsSet::default(),
96+
}
97+
}
98+
}
99+
100+
impl DisplayAs for ScanMemoryLogExec {
101+
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
102+
match t {
103+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
104+
write!(
105+
f,
106+
"ScanMemoryLogExec: spark_partition={}",
107+
self.spark_partition
108+
)
109+
}
110+
DisplayFormatType::TreeRender => unimplemented!(),
111+
}
112+
}
113+
}
114+
115+
impl ExecutionPlan for ScanMemoryLogExec {
116+
fn as_any(&self) -> &dyn Any {
117+
self
118+
}
119+
120+
fn schema(&self) -> SchemaRef {
121+
self.child.schema()
122+
}
123+
124+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
125+
vec![&self.child]
126+
}
127+
128+
fn with_new_children(
129+
self: Arc<Self>,
130+
mut children: Vec<Arc<dyn ExecutionPlan>>,
131+
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
132+
Ok(Arc::new(ScanMemoryLogExec::new(
133+
children.remove(0),
134+
self.spark_partition,
135+
)))
136+
}
137+
138+
fn execute(
139+
&self,
140+
partition: usize,
141+
context: Arc<TaskContext>,
142+
) -> DataFusionResult<SendableRecordBatchStream> {
143+
// Snapshot before child.execute() — baseline
144+
let allocated_before_execute = jemalloc_allocated();
145+
146+
let child_stream = self.child.execute(partition, Arc::clone(&context))?;
147+
148+
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
149+
Ok(Box::pin(ScanMemoryLogStream {
150+
child: child_stream,
151+
context,
152+
spark_partition: self.spark_partition,
153+
logged: false,
154+
baseline_metrics,
155+
allocated_before_scan: allocated_before_execute,
156+
peak_allocated: allocated_before_execute,
157+
batch_count: 0,
158+
total_rows: 0,
159+
}))
160+
}
161+
162+
fn properties(&self) -> &PlanProperties {
163+
&self.cache
164+
}
165+
166+
fn name(&self) -> &str {
167+
"ScanMemoryLogExec"
168+
}
169+
170+
fn metrics(&self) -> Option<MetricsSet> {
171+
Some(self.metrics.clone_inner())
172+
}
173+
}
174+
175+
struct ScanMemoryLogStream {
176+
child: SendableRecordBatchStream,
177+
context: Arc<TaskContext>,
178+
spark_partition: i32,
179+
logged: bool,
180+
baseline_metrics: BaselineMetrics,
181+
/// jemalloc allocated bytes captured before child.execute()
182+
allocated_before_scan: usize,
183+
/// High-water mark of jemalloc allocated bytes seen during polling
184+
peak_allocated: usize,
185+
/// Number of batches consumed through this stream
186+
batch_count: usize,
187+
/// Total rows consumed through this stream
188+
total_rows: usize,
189+
}
190+
191+
impl Stream for ScanMemoryLogStream {
192+
type Item = DataFusionResult<RecordBatch>;
193+
194+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
195+
let result = Pin::new(&mut self.child).poll_next(cx);
196+
match &result {
197+
Poll::Ready(Some(Ok(batch))) => {
198+
self.batch_count += 1;
199+
self.total_rows += batch.num_rows();
200+
self.baseline_metrics.record_output(batch.num_rows());
201+
// Track peak allocation across all batches
202+
let current = jemalloc_allocated();
203+
if current > self.peak_allocated {
204+
self.peak_allocated = current;
205+
}
206+
}
207+
Poll::Ready(None) if !self.logged => {
208+
self.logged = true;
209+
let pool = self.context.memory_pool();
210+
let allocated_at_eof = jemalloc_allocated();
211+
let total_delta =
212+
allocated_at_eof.saturating_sub(self.allocated_before_scan);
213+
let peak_delta =
214+
self.peak_allocated.saturating_sub(self.allocated_before_scan);
215+
216+
// Accumulate into global counters
217+
TOTAL_DELTA_SUM.fetch_add(total_delta, Ordering::Relaxed);
218+
TOTAL_ROWS_SUM.fetch_add(self.total_rows, Ordering::Relaxed);
219+
// Update max peak (atomic CAS loop)
220+
let mut current_max = MAX_PEAK.load(Ordering::Relaxed);
221+
while self.peak_allocated > current_max {
222+
match MAX_PEAK.compare_exchange_weak(
223+
current_max,
224+
self.peak_allocated,
225+
Ordering::Relaxed,
226+
Ordering::Relaxed,
227+
) {
228+
Ok(_) => break,
229+
Err(actual) => current_max = actual,
230+
}
231+
}
232+
let completed = COMPLETED_PARTITIONS.fetch_add(1, Ordering::Relaxed) + 1;
233+
234+
log::info!(
235+
"ScanMemoryLogExec spark_partition={}: scan complete, \
236+
batches={}, rows={}, \
237+
memory_pool_reserved={}, \
238+
jemalloc_allocated: before={}, at_eof={}, peak={}, \
239+
total_delta={}, peak_delta={} | \
240+
aggregate: completed_partitions={}, sum_total_delta={}, \
241+
max_peak={}, sum_rows={}",
242+
self.spark_partition,
243+
self.batch_count,
244+
self.total_rows,
245+
pool.reserved(),
246+
self.allocated_before_scan,
247+
allocated_at_eof,
248+
self.peak_allocated,
249+
total_delta,
250+
peak_delta,
251+
completed,
252+
TOTAL_DELTA_SUM.load(Ordering::Relaxed),
253+
MAX_PEAK.load(Ordering::Relaxed),
254+
TOTAL_ROWS_SUM.load(Ordering::Relaxed),
255+
);
256+
}
257+
_ => {}
258+
}
259+
result
260+
}
261+
}
262+
263+
impl RecordBatchStream for ScanMemoryLogStream {
264+
fn schema(&self) -> SchemaRef {
265+
self.child.schema()
266+
}
267+
}

native/core/src/execution/planner.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub mod operator_registry;
2323

2424
use crate::execution::operators::init_csv_datasource_exec;
2525
use crate::execution::operators::IcebergScanExec;
26+
use crate::execution::operators::ScanMemoryLogExec;
2627
use crate::{
2728
errors::ExpressionError,
2829
execution::{
@@ -1185,6 +1186,8 @@ impl PhysicalPlanner {
11851186
self.session_ctx(),
11861187
common.encryption_enabled,
11871188
)?;
1189+
let scan: Arc<dyn ExecutionPlan> =
1190+
Arc::new(ScanMemoryLogExec::new(scan, self.partition));
11881191
Ok((
11891192
vec![],
11901193
Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])),

0 commit comments

Comments
 (0)