Skip to content

Commit f7a8a64

Browse files
committed
chore: Memory scan log exec experiments
1 parent 7a99b40 commit f7a8a64

1 file changed

Lines changed: 34 additions & 16 deletions

File tree

native/core/src/execution/operators/scan_memory_log.rs

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,20 @@ fn jemalloc_thread_stats() -> (u64, u64) {
6363
(0, 0)
6464
}
6565

66-
/// Process-wide jemalloc allocated bytes.
66+
/// Process-wide jemalloc stats: (allocated, active, resident).
6767
#[cfg(feature = "jemalloc")]
68-
fn jemalloc_process_allocated() -> u64 {
68+
fn jemalloc_process_stats() -> (u64, u64, u64) {
6969
use tikv_jemalloc_ctl::{epoch, stats};
7070
epoch::advance().ok();
71-
stats::allocated::read().unwrap_or(0) as u64
71+
let allocat = stats::allocated::read().unwrap_or(0) as u64;
72+
let active = stats::active::read().unwrap_or(0) as u64;
73+
let resident = stats::resident::read().unwrap_or(0) as u64;
74+
(allocated, active, resident)
7275
}
7376

7477
#[cfg(not(feature = "jemalloc"))]
75-
fn jemalloc_process_allocated() -> u64 {
76-
0
78+
fn jemalloc_process_stats() -> (u64, u64, u64) {
79+
(0, 0, 0)
7780
}
7881

7982
fn update_atomic_max(atomic: &AtomicU64, value: u64) {
@@ -178,6 +181,7 @@ impl ExecutionPlan for ScanMemoryLogExec {
178181
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
179182
Ok(Box::pin(ScanMemoryLogStream {
180183
child: child_stream,
184+
child_plan: Arc::clone(&self.child),
181185
context,
182186
spark_partition: self.spark_partition,
183187
logged: false,
@@ -205,6 +209,8 @@ impl ExecutionPlan for ScanMemoryLogExec {
205209

206210
struct ScanMemoryLogStream {
207211
child: SendableRecordBatchStream,
212+
/// Reference to child plan to read its metrics at EOF
213+
child_plan: Arc<dyn ExecutionPlan>,
208214
context: Arc<TaskContext>,
209215
spark_partition: i32,
210216
logged: bool,
@@ -240,36 +246,48 @@ impl Stream for ScanMemoryLogStream {
240246
Poll::Ready(None) if !self.logged => {
241247
self.logged = true;
242248
let pool = self.context.memory_pool();
243-
let process_allocated = jemalloc_process_allocated();
244-
let net = self.thread_total_allocated as i64
245-
- self.thread_total_deallocated as i64;
249+
let (process_allocated, process_active, process_resident) =
250+
jemalloc_process_stats();
251+
let net = self.thread_total_allocated as i64 - self.thread_total_deallocated as i64;
252+
253+
// Read bytes_scanned from child plan's metrics
254+
let bytes_scanned = self
255+
.child_plan
256+
.metrics()
257+
.map(|m| {
258+
m.iter()
259+
.filter(|metric| metric.value().name() == "bytes_scanned")
260+
.map(|metric| metric.value().as_usize())
261+
.sum::<usize>()
262+
})
263+
.unwrap_or(0);
246264

247265
// Accumulate into global aggregates
248-
AGG_THREAD_ALLOCATED
249-
.fetch_add(self.thread_total_allocated, Ordering::Relaxed);
250-
AGG_THREAD_DEALLOCATED
251-
.fetch_add(self.thread_total_deallocated, Ordering::Relaxed);
266+
AGG_THREAD_ALLOCATED.fetch_add(self.thread_total_allocated, Ordering::Relaxed);
267+
AGG_THREAD_DEALLOCATED.fetch_add(self.thread_total_deallocated, Ordering::Relaxed);
252268
update_atomic_max(&AGG_PEAK_PROCESS_ALLOCATED, process_allocated);
253269
AGG_TOTAL_ROWS.fetch_add(self.total_rows as u64, Ordering::Relaxed);
254-
let completed =
255-
AGG_COMPLETED_PARTITIONS.fetch_add(1, Ordering::Relaxed) + 1;
270+
let completed = AGG_COMPLETED_PARTITIONS.fetch_add(1, Ordering::Relaxed) + 1;
256271

257272
log::info!(
258273
"ScanMemoryLogExec spark_partition={}: scan complete, \
259-
batches={}, rows={}, \
274+
batches={}, rows={}, bytes_scanned={}, \
260275
memory_pool_reserved={}, \
261276
thread: allocated={}, deallocated={}, net={}, \
262-
process_allocated={} | \
277+
process: allocated={}, active={}, resident={} | \
263278
aggregate(n={}): thread_allocated={}, thread_deallocated={}, \
264279
thread_net={}, max_process_allocated={}, total_rows={}",
265280
self.spark_partition,
266281
self.batch_count,
267282
self.total_rows,
283+
bytes_scanned,
268284
pool.reserved(),
269285
self.thread_total_allocated,
270286
self.thread_total_deallocated,
271287
net,
272288
process_allocated,
289+
process_active,
290+
process_resident,
273291
completed,
274292
AGG_THREAD_ALLOCATED.load(Ordering::Relaxed),
275293
AGG_THREAD_DEALLOCATED.load(Ordering::Relaxed),

0 commit comments

Comments
 (0)