Skip to content

Commit d9596e7

Browse files
committed
Add log to ghj stream memory managment
1 parent 6395907 commit d9596e7

1 file changed

Lines changed: 31 additions & 1 deletion

File tree

  • datafusion/physical-plan/src/joins/grace_hash_join

datafusion/physical-plan/src/joins/grace_hash_join/stream.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use crate::test::TestMemoryExec;
4242
use ahash::RandomState;
4343
use arrow::datatypes::SchemaRef;
4444
use arrow::record_batch::RecordBatch;
45+
use cfg_if::cfg_if;
4546
use datafusion_common::{JoinType, NullEquality, Result};
4647
use datafusion_execution::memory_pool::{
4748
human_readable_size, MemoryConsumer, MemoryReservation,
@@ -69,6 +70,27 @@ fn prefetch_cap_bytes(_current_limit: usize) -> usize {
6970
0
7071
}
7172

73+
cfg_if! {
74+
if #[cfg(target_os = "linux")] {
75+
fn current_rss_bytes() -> Option<u64> {
76+
let statm = std::fs::read_to_string("/proc/self/statm").ok()?;
77+
let mut parts = statm.split_whitespace();
78+
// statm: size resident shared text lib data dt
79+
let _size_pages = parts.next()?;
80+
let resident_pages = parts.next()?.parse::<u64>().ok()?;
81+
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
82+
if page_size <= 0 {
83+
return None;
84+
}
85+
Some(resident_pages.saturating_mul(page_size as u64))
86+
}
87+
} else {
88+
fn current_rss_bytes() -> Option<u64> {
89+
None
90+
}
91+
}
92+
}
93+
7294
fn global_join_semaphore() -> &'static Arc<Semaphore> {
7395
static SEM: OnceLock<Arc<Semaphore>> = OnceLock::new();
7496
SEM.get_or_init(|| Arc::new(Semaphore::new(1)))
@@ -687,6 +709,14 @@ impl GraceHashJoinStream {
687709
match fut.get_shared(cx) {
688710
Poll::Ready(Ok(permit)) => {
689711
*join_permit = Some(permit);
712+
if let Some(rss) = current_rss_bytes() {
713+
info!(
714+
"Grace hash join acquired join permit for partition {} (pass {}), rss={}",
715+
current_work.as_ref().map(|w| w.partition_id).unwrap_or(usize::MAX),
716+
current_work.as_ref().map(|w| w.pass).unwrap_or(usize::MAX),
717+
human_readable_size(rss as usize)
718+
);
719+
}
690720
*join_permit_fut = None;
691721
}
692722
Poll::Ready(Err(e)) => {
@@ -1254,7 +1284,7 @@ impl GraceHashJoinStream {
12541284
let freed = res.free();
12551285
debug!(
12561286
"Grace hash join stream completion freed {} after shrink failure (requested {})",
1257-
human_readable_size(freed),
1287+
human_readable_size(freed),
12581288
human_readable_size(bytes_to_free)
12591289
);
12601290
}

0 commit comments

Comments
 (0)