Skip to content

Commit d3e2e78

Browse files
committed
clean up round 1
1 parent df0a798 commit d3e2e78

5 files changed

Lines changed: 61 additions & 223 deletions

File tree

datafusion/core/src/physical_planner.rs

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ use crate::physical_plan::explain::ExplainExec;
4242
use crate::physical_plan::filter::FilterExecBuilder;
4343
use crate::physical_plan::joins::utils as join_utils;
4444
use crate::physical_plan::joins::{
45-
CrossJoinExec, HashJoinExec, JoinAcceleratorBuilder, JoinAcceleratorSpec,
46-
NestedLoopJoinExecBuilder, PartitionMode, SortMergeJoinExec,
45+
CrossJoinExec, HashJoinExec, NestedLoopJoinExecBuilder, PartitionMode,
46+
SortMergeJoinExec,
4747
};
4848
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
4949
use crate::physical_plan::projection::{ProjectionExec, ProjectionExpr};
@@ -71,8 +71,8 @@ use datafusion_common::tree_node::{
7171
Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
7272
};
7373
use datafusion_common::{
74-
DFSchema, DFSchemaRef, JoinSide, ScalarValue, exec_err, internal_datafusion_err,
75-
internal_err, not_impl_err, plan_err,
74+
DFSchema, DFSchemaRef, ScalarValue, exec_err, internal_datafusion_err, internal_err,
75+
not_impl_err, plan_err,
7676
};
7777
use datafusion_common::{
7878
TableReference, assert_eq_or_internal_err, assert_or_internal_err,
@@ -2376,20 +2376,9 @@ fn build_nested_loop_join(
23762376
join_filter: Option<join_utils::JoinFilter>,
23772377
join_type: JoinType,
23782378
) -> Result<Arc<dyn ExecutionPlan>> {
2379-
let left_schema = physical_left.schema();
2380-
let right_schema = physical_right.schema();
2381-
let accelerator = JoinAcceleratorBuilder::try_new(JoinAcceleratorSpec::new(
2382-
join_type,
2383-
JoinSide::Left,
2384-
Arc::clone(&left_schema),
2385-
Arc::clone(&right_schema),
2386-
join_filter.clone(),
2387-
))?;
2388-
23892379
Ok(Arc::new(
23902380
NestedLoopJoinExecBuilder::new(physical_left, physical_right, join_type)
23912381
.with_filter(join_filter)
2392-
.with_accelerator(accelerator)
23932382
.build()?,
23942383
))
23952384
}

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3773,8 +3773,6 @@ impl PartialOrd for Aggregate {
37733773
#[allow(clippy::allow_attributes, clippy::mutable_key_type)] // Expr contains Arc with interior mutability but is intentionally used as hash key
37743774
fn max_grouping_set_duplicate_ordinal(group_expr: &[Expr]) -> usize {
37753775
if let Some(Expr::GroupingSet(GroupingSet::GroupingSets(sets))) = group_expr.first() {
3776-
#[allow(clippy::allow_attributes, clippy::mutable_key_type)]
3777-
// Expr contains Arc with interior mutability but is intentionally used as a hash key.
37783776
let mut counts: HashMap<&[Expr], usize> = HashMap::new();
37793777
for set in sets {
37803778
*counts.entry(set).or_insert(0) += 1;

datafusion/physical-plan/src/joins/join_accelerator.rs

Lines changed: 31 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,14 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Join accelerator interfaces used by [`crate::joins::NestedLoopJoinExec`].
19-
//!
20-
//! The default probing implementation is a naive cartesian fallback that
21-
//! enumerates all build-side rows for each probe row in fixed-size chunks.
18+
//! Join accelerator interfaces used by [`crate::joins::NestedLoopJoinExec`], see
19+
//! comments in [`JoinAccelerator`] for details.
2220
2321
use std::cmp::min;
2422
use std::fmt::Debug;
2523
use std::ops::Range;
2624
use std::sync::Arc;
2725

28-
use arrow::array::{UInt32Array, UInt64Array};
2926
use arrow::compute::concat_batches;
3027
use arrow::datatypes::SchemaRef;
3128
use arrow::record_batch::RecordBatch;
@@ -36,20 +33,24 @@ use super::join_filter::JoinFilter;
3633
use datafusion_common::{Result, internal_datafusion_err, internal_err};
3734

3835
/// Shared reference to a selected join accelerator.
39-
pub type JoinAcceleratorRef = Arc<dyn JoinAccelerator>;
36+
pub(crate) type JoinAcceleratorRef = Arc<dyn JoinAccelerator>;
4037

4138
/// Planning-time specification used to select a join accelerator.
4239
#[derive(Debug, Clone)]
43-
pub struct JoinAcceleratorSpec {
40+
pub(crate) struct JoinAcceleratorSpec {
41+
#[expect(dead_code)]
42+
// Kept for accelerator selection once non-fallback implementations are enabled.
4443
join_type: JoinType,
4544
build_side: JoinSide,
4645
left_schema: SchemaRef,
4746
right_schema: SchemaRef,
47+
#[expect(dead_code)]
48+
// Kept for accelerator selection once non-fallback implementations are enabled.
4849
filter: Option<JoinFilter>,
4950
}
5051

5152
impl JoinAcceleratorSpec {
52-
pub fn new(
53+
pub(crate) fn new(
5354
join_type: JoinType,
5455
build_side: JoinSide,
5556
left_schema: SchemaRef,
@@ -65,27 +66,7 @@ impl JoinAcceleratorSpec {
6566
}
6667
}
6768

68-
pub fn join_type(&self) -> JoinType {
69-
self.join_type
70-
}
71-
72-
pub fn build_side(&self) -> JoinSide {
73-
self.build_side
74-
}
75-
76-
pub fn left_schema(&self) -> &SchemaRef {
77-
&self.left_schema
78-
}
79-
80-
pub fn right_schema(&self) -> &SchemaRef {
81-
&self.right_schema
82-
}
83-
84-
pub fn filter(&self) -> Option<&JoinFilter> {
85-
self.filter.as_ref()
86-
}
87-
88-
pub fn build_schema(&self) -> &SchemaRef {
69+
pub(crate) fn build_schema(&self) -> &SchemaRef {
8970
match self.build_side {
9071
JoinSide::Left => &self.left_schema,
9172
JoinSide::Right => &self.right_schema,
@@ -96,13 +77,13 @@ impl JoinAcceleratorSpec {
9677

9778
/// Selects a planning-time join accelerator.
9879
#[derive(Debug, Default)]
99-
pub struct JoinAcceleratorBuilder;
80+
pub(crate) struct JoinAcceleratorBuilder;
10081

10182
impl JoinAcceleratorBuilder {
10283
/// Select the accelerator for a nested loop join.
10384
///
10485
/// This always succeeds because NLJ has a naive cartesian fallback.
105-
pub fn try_new(spec: JoinAcceleratorSpec) -> Result<JoinAcceleratorRef> {
86+
pub(crate) fn try_new(spec: JoinAcceleratorSpec) -> Result<JoinAcceleratorRef> {
10687
Ok(Arc::new(FallbackNestedLoopJoinAccelerator::new(spec)))
10788
}
10889
}
@@ -191,7 +172,13 @@ impl JoinAcceleratorBuilder {
191172
/// }
192173
/// }
193174
/// ```
194-
pub trait JoinAccelerator: Debug + Send + Sync {
175+
///
176+
/// # Implementation Plan
177+
/// This trait is intended to become public. For now, keep it private while
178+
/// internal experiments stabilize the API.
179+
pub(crate) trait JoinAccelerator: Debug + Send + Sync {
180+
#[cfg_attr(not(test), expect(dead_code))]
181+
// Will be used in explain output when accelerator selection is visible.
195182
fn name(&self) -> &'static str;
196183

197184
/// Return `true` only if this accelerator supports the nested-loop join
@@ -253,19 +240,22 @@ pub trait JoinAccelerator: Debug + Send + Sync {
253240
}
254241

255242
/// Stateful batch-level probe cursor returned by a runtime [`JoinAccelerator`].
256-
pub trait JoinAcceleratorProber: Debug + Send {
243+
pub(crate) trait JoinAcceleratorProber: Debug + Send {
257244
/// Incrementally return the next candidate chunk for the active probe batch,
258245
/// returns `None` when the probing ends.
259246
///
260247
/// Candidate indices are relative to the accelerator's concatenated build
261248
/// batch and the prepared probe batch returned by [`JoinAccelerator::init_prober`].
262249
///
263-
/// Implementations should:
264-
/// - Try to emit as many matches as possible at once for efficiency
265-
/// - Emit at most `batch_size` matches in one iteration, to make subsequent
266-
/// operations more memory-efficient
250+
/// Efficiency guideline (violations won't cause error):
267251
///
268-
/// TODO(PR): Currently NLJ assumes all exact `batch_size` matches. A new join
252+
/// - Try to emit as many matches as possible at once for internal efficiency
253+
/// - Emit at most `batch_size` matches per iteration. This lowers latency to
254+
/// first output. Downstream operators are typically optimized for configured
255+
/// input bach size, larger batches can be slower to process or use more
256+
/// memory.
257+
///
258+
/// TODO: Currently NLJ assumes all exact `batch_size` matches. A new join
269259
/// index might output fewer, so the caller might want a `BatchCoalescer` to
270260
/// vectorize the later remaining join filter evaluation.
271261
fn probe(&mut self) -> Result<Option<JoinProbeCandidates>>;
@@ -279,87 +269,15 @@ pub trait JoinAcceleratorProber: Debug + Send {
279269
/// - Build-side indices are relative to the concatenated build-side batch from
280270
/// [`JoinAccelerator::build_batch`].
281271
#[derive(Debug, Clone)]
282-
pub enum JoinProbeCandidates {
272+
pub(crate) enum JoinProbeCandidates {
283273
/// Consecutive build rows for one probe row, represented as a range for
284274
/// efficiency in NLJ.
285275
BuildRange {
286276
probe_row: usize,
287277
build_range: Range<usize>,
288278
},
289-
/// Arbitrary build/probe row pairs.
290-
BuildIndices {
291-
build_indices: UInt64Array,
292-
probe_indices: UInt32Array,
293-
},
294-
}
295-
296-
impl JoinProbeCandidates {
297-
pub fn try_new_indices(
298-
build_indices: UInt64Array,
299-
probe_indices: UInt32Array,
300-
) -> Result<Self> {
301-
if build_indices.len() != probe_indices.len() {
302-
return internal_err!(
303-
"Join probe build/probe index lengths differ: {} vs {}",
304-
build_indices.len(),
305-
probe_indices.len()
306-
);
307-
}
308-
309-
Ok(Self::BuildIndices {
310-
build_indices,
311-
probe_indices,
312-
})
313-
}
314-
315-
/// Number of candidate build rows in this probe batch.
316-
pub fn len(&self) -> usize {
317-
match self {
318-
Self::BuildRange { build_range, .. } => build_range.len(),
319-
Self::BuildIndices { build_indices, .. } => build_indices.len(),
320-
}
321-
}
322-
323-
/// Returns `true` if this probe batch contains no candidate build rows.
324-
pub fn is_empty(&self) -> bool {
325-
self.len() == 0
326-
}
327-
328-
/// Convert the batch into explicit build/probe index arrays.
329-
pub fn into_indices(self) -> Result<(UInt64Array, UInt32Array)> {
330-
match self {
331-
Self::BuildRange {
332-
probe_row,
333-
build_range,
334-
} => {
335-
let build_indices = build_range
336-
.map(|index| {
337-
u64::try_from(index).map_err(|_| {
338-
internal_datafusion_err!(
339-
"Join probe range index does not fit into u64: {index}"
340-
)
341-
})
342-
})
343-
.collect::<Result<Vec<_>>>()?;
344-
let build_indices = UInt64Array::from(build_indices);
345-
let probe_row = u32::try_from(probe_row).map_err(|_| {
346-
internal_datafusion_err!(
347-
"Probe row index does not fit into u32: {probe_row}"
348-
)
349-
})?;
350-
let probe_indices = UInt32Array::from_iter_values(std::iter::repeat_n(
351-
probe_row,
352-
build_indices.len(),
353-
));
354-
Ok((build_indices, probe_indices))
355-
}
356-
Self::BuildIndices {
357-
build_indices,
358-
probe_indices,
359-
..
360-
} => Ok((build_indices, probe_indices)),
361-
}
362-
}
279+
// Specialized joins will need more general representation like (build_index, probe_index)
280+
// , so keep it a enum now.
363281
}
364282

365283
#[derive(Debug, Clone)]

datafusion/physical-plan/src/joins/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use datafusion_physical_expr::PhysicalExprRef;
2323
pub use hash_join::{
2424
HashExpr, HashJoinExec, HashJoinExecBuilder, HashTableLookupExpr, SeededRandomState,
2525
};
26-
pub use join_accelerator::{
26+
use join_accelerator::{
2727
JoinAccelerator, JoinAcceleratorBuilder, JoinAcceleratorProber, JoinAcceleratorRef,
2828
JoinAcceleratorSpec, JoinProbeCandidates,
2929
};

0 commit comments

Comments
 (0)