Skip to content

Commit eee9c59

Browse files
committed
Fix semaphore
1 parent bb24c2a commit eee9c59

1 file changed

Lines changed: 11 additions & 10 deletions

File tree

  • datafusion/physical-plan/src/joins/grace_hash_join

datafusion/physical-plan/src/joins/grace_hash_join/stream.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use futures::stream::FuturesUnordered;
5252
use futures::{ready, Stream, StreamExt};
5353
use log::{debug, info};
5454
use parking_lot::Mutex;
55-
use tokio::sync::OnceCell;
55+
use std::sync::OnceLock;
5656
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
5757

5858
/// Maximum number of partitions we allow after recursive repartitioning to
@@ -69,6 +69,11 @@ fn prefetch_cap_bytes(_current_limit: usize) -> usize {
6969
0
7070
}
7171

72+
fn global_join_semaphore() -> &'static Arc<Semaphore> {
73+
static SEM: OnceLock<Arc<Semaphore>> = OnceLock::new();
74+
SEM.get_or_init(|| Arc::new(Semaphore::new(2)))
75+
}
76+
7277
enum GraceJoinState {
7378
/// Waiting for the partitioning phase (Phase 1) to finish
7479
WaitPartitioning,
@@ -90,7 +95,7 @@ enum GraceJoinState {
9095
reservation: Arc<Mutex<MemoryReservation>>,
9196
/// Permit to limit concurrent in-memory joins across tasks
9297
join_permit: Option<Arc<OwnedSemaphorePermit>>,
93-
join_permit_fut: Option<OnceFut<Arc<OwnedSemaphorePermit>>>,
98+
join_permit_fut: Option<OnceFut<OwnedSemaphorePermit>>,
9499
current_join_start: Option<Instant>,
95100
repartition_fut: Option<OnceFut<Vec<PartitionWorkItem>>>,
96101
/// Prefetch for the next partition (at most one in-flight)
@@ -465,11 +470,6 @@ pub struct GraceHashJoinStream {
465470
state: GraceJoinState,
466471
}
467472

468-
fn global_join_semaphore() -> &'static Semaphore {
469-
static SEM: OnceCell<Semaphore> = OnceCell::const_new();
470-
SEM.get_or_init(|| Semaphore::new(2))
471-
}
472-
473473
#[derive(Debug, Clone)]
474474
pub struct SpillFut {
475475
left: Vec<PartitionIndex>,
@@ -677,15 +677,16 @@ impl GraceHashJoinStream {
677677
// Acquire global join permit before loading/joining to cap concurrent joins.
678678
if join_permit.is_none() {
679679
if join_permit_fut.is_none() {
680-
let sem = global_join_semaphore().clone();
680+
let sem = Arc::clone(global_join_semaphore());
681681
*join_permit_fut = Some(OnceFut::new(async move {
682-
Arc::new(sem.acquire_owned().await.unwrap())
682+
let permit = sem.acquire_owned().await.unwrap();
683+
Ok(permit)
683684
}));
684685
}
685686
if let Some(fut) = join_permit_fut.as_mut() {
686687
match fut.get_shared(cx) {
687688
Poll::Ready(Ok(permit)) => {
688-
*join_permit = Some(permit.clone());
689+
*join_permit = Some(permit);
689690
*join_permit_fut = None;
690691
}
691692
Poll::Ready(Err(e)) => {

0 commit comments

Comments
 (0)