Skip to content

Commit 7e176c0

Browse files
committed
Add global semaphore to serialize partitioning phase
1 parent 6f2a178 commit 7e176c0

2 files changed

Lines changed: 11 additions & 3 deletions

File tree

datafusion/physical-plan/src/joins/grace_hash_join/exec.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use std::any::Any;
4242
use std::fmt;
4343
use std::fmt::Formatter;
4444
use std::mem::size_of;
45-
use std::sync::Arc;
45+
use std::sync::{Arc, OnceLock};
4646

4747
use arrow::array::UInt32Array;
4848
use arrow::array::{Array, StringViewArray};
@@ -72,11 +72,17 @@ use datafusion_physical_expr_common::physical_expr::fmt_sql;
7272
use futures::StreamExt;
7373
use log::{debug, info};
7474
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
75+
use tokio::sync::Semaphore;
7576

7677
/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions.
7778
const HASH_JOIN_SEED: RandomState =
7879
RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
7980

81+
fn global_partition_semaphore() -> &'static Arc<Semaphore> {
82+
static SEM: OnceLock<Arc<Semaphore>> = OnceLock::new();
83+
SEM.get_or_init(|| Arc::new(Semaphore::new(1)))
84+
}
85+
8086
pub struct GraceHashJoinExec {
8187
/// left (build) side which gets hashed
8288
pub left: Arc<dyn ExecutionPlan>,
@@ -686,6 +692,8 @@ impl ExecutionPlan for GraceHashJoinExec {
686692
let join_metrics_clone = Arc::clone(&join_metrics);
687693
let context_for_partition = Arc::clone(&context);
688694
let spill_fut = OnceFut::new(async move {
695+
let sem = Arc::clone(global_partition_semaphore());
696+
let _partition_permit = sem.acquire_owned().await.unwrap();
689697
// Track memory used during the partitioning phase for this join
690698
let mut left_reservation =
691699
MemoryConsumer::new(format!("GraceHashJoinPartitionLeft[{partition}]"))

datafusion/physical-plan/src/joins/grace_hash_join/stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ use datafusion_execution::TaskContext;
5050
use datafusion_physical_expr::PhysicalExprRef;
5151
use futures::stream::FuturesUnordered;
5252
use futures::{ready, Stream, StreamExt};
53-
#[cfg(target_os = "linux")]
54-
use std::mem;
5553
use log::{debug, info};
5654
use parking_lot::Mutex;
55+
#[cfg(target_os = "linux")]
56+
use std::mem;
5757
use std::sync::OnceLock;
5858
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
5959

0 commit comments

Comments
 (0)