Skip to content

Commit 4b0a66b

Browse files
committed
Pass projection and filter
1 parent 2e85309 commit 4b0a66b

3 files changed

Lines changed: 48 additions & 58 deletions

File tree

datafusion/physical-plan/src/joins/grace_hash_join/exec.rs

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,8 @@ impl ExecutionPlan for GraceHashJoinExec {
684684
spill_right,
685685
on_left,
686686
on_right,
687-
self.random_state.clone(),
687+
self.projection.clone(),
688+
self.filter.clone(),
688689
self.join_type,
689690
column_indices_after_projection,
690691
join_metrics,
@@ -854,24 +855,8 @@ impl ExecutionPlan for GraceHashJoinExec {
854855
}
855856
}
856857

857-
/// Accumulator for collecting min/max bounds from build-side data during hash join.
858-
///
859-
/// This struct encapsulates the logic for progressively computing column bounds
860-
/// (minimum and maximum values) for a specific join key expression as batches
861-
/// are processed during the build phase of a hash join.
862-
///
863-
/// The bounds are used for dynamic filter pushdown optimization, where filters
864-
/// based on the actual data ranges can be pushed down to the probe side to
865-
/// eliminate unnecessary data early.
866-
struct CollectLeftAccumulator {
867-
/// The physical expression to evaluate for each batch
868-
expr: Arc<dyn PhysicalExpr>,
869-
/// Accumulator for tracking the minimum value across all batches
870-
min: MinAccumulator,
871-
/// Accumulator for tracking the maximum value across all batches
872-
max: MaxAccumulator,
873-
}
874858

859+
#[allow(clippy::too_many_arguments)]
875860
pub async fn partition_and_spill(
876861
random_state: RandomState,
877862
on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
@@ -915,6 +900,7 @@ pub async fn partition_and_spill(
915900
Ok((left_index, right_index))
916901
}
917902

903+
#[allow(clippy::too_many_arguments)]
918904
async fn partition_and_spill_one_side(
919905
input: &mut SendableRecordBatchStream,
920906
on_exprs: &[PhysicalExprRef],

datafusion/physical-plan/src/joins/grace_hash_join/stream.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use crate::empty::EmptyExec;
3434
use crate::joins::grace_hash_join::exec::PartitionIndex;
3535
use crate::joins::{HashJoinExec, PartitionMode};
3636
use crate::test::TestMemoryExec;
37-
use ahash::RandomState;
3837
use arrow::datatypes::{Schema, SchemaRef};
3938
use arrow::record_batch::RecordBatch;
4039
use datafusion_common::{JoinType, NullEquality, Result};
@@ -70,7 +69,8 @@ pub struct GraceHashJoinStream {
7069
spill_right: Arc<SpillManager>,
7170
on_left: Vec<PhysicalExprRef>,
7271
on_right: Vec<PhysicalExprRef>,
73-
random_state: RandomState,
72+
projection: Option<Vec<usize>>,
73+
filter: Option<JoinFilter>,
7474
join_type: JoinType,
7575
column_indices: Vec<ColumnIndex>,
7676
join_metrics: Arc<BuildProbeJoinMetrics>,
@@ -113,7 +113,8 @@ impl GraceHashJoinStream {
113113
spill_right: Arc<SpillManager>,
114114
on_left: Vec<PhysicalExprRef>,
115115
on_right: Vec<PhysicalExprRef>,
116-
random_state: RandomState,
116+
projection: Option<Vec<usize>>,
117+
filter: Option<JoinFilter>,
117118
join_type: JoinType,
118119
column_indices: Vec<ColumnIndex>,
119120
join_metrics: Arc<BuildProbeJoinMetrics>,
@@ -127,7 +128,8 @@ impl GraceHashJoinStream {
127128
spill_right,
128129
on_left,
129130
on_right,
130-
random_state,
131+
projection,
132+
filter,
131133
join_type,
132134
column_indices,
133135
join_metrics,
@@ -225,7 +227,8 @@ impl GraceHashJoinStream {
225227
right_batches,
226228
&self.on_left,
227229
&self.on_right,
228-
self.random_state.clone(),
230+
self.projection.clone(),
231+
self.filter.clone(),
229232
self.join_type,
230233
&self.column_indices,
231234
&self.join_metrics,
@@ -283,10 +286,11 @@ fn build_in_memory_join_stream(
283286
right_batches: Vec<RecordBatch>,
284287
on_left: &[PhysicalExprRef],
285288
on_right: &[PhysicalExprRef],
286-
random_state: RandomState,
289+
projection: Option<Vec<usize>>,
290+
filter: Option<JoinFilter>,
287291
join_type: JoinType,
288-
column_indices: &[ColumnIndex],
289-
join_metrics: &BuildProbeJoinMetrics,
292+
_column_indices: &[ColumnIndex],
293+
_join_metrics: &BuildProbeJoinMetrics,
290294
context: &Arc<TaskContext>,
291295
) -> Result<SendableRecordBatchStream> {
292296
if left_batches.is_empty() && right_batches.is_empty() {
@@ -324,9 +328,9 @@ fn build_in_memory_join_stream(
324328
left_plan,
325329
right_plan,
326330
on,
327-
None::<JoinFilter>,
331+
filter,
328332
&join_type,
329-
None,
333+
projection,
330334
PartitionMode::CollectLeft,
331335
NullEquality::NullEqualsNothing,
332336
)?;

datafusion/physical-plan/src/spill/spill_manager.rs

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -173,37 +173,37 @@ impl SpillManager {
173173
/// Automatically decides whether to spill the given RecordBatch to memory or disk,
174174
/// depending on available memory pool capacity.
175175
pub(crate) fn spill_batch_auto(&self, batch: &RecordBatch, request_msg: &str) -> Result<SpillLocation> {
176-
let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else {
177-
return Err(DataFusionError::Execution(
178-
"failed to spill batch to disk".into(),
179-
));
180-
};
181-
Ok(SpillLocation::Disk(Arc::new(file)))
182-
// //
183-
// let size = batch.get_sliced_size()?;
184-
//
185-
// // Check current memory usage and total limit from the runtime memory pool
186-
// let used = self.env.memory_pool.reserved();
187-
// let limit = match self.env.memory_pool.memory_limit() {
188-
// datafusion_execution::memory_pool::MemoryLimit::Finite(l) => l,
189-
// _ => usize::MAX,
176+
// let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else {
177+
// return Err(DataFusionError::Execution(
178+
// "failed to spill batch to disk".into(),
179+
// ));
190180
// };
191-
// println!("size {size} used {used}");
192-
// // If there's enough memory (with a small safety margin), keep it in memory
193-
// if used + size * 3 * 64 / 2 <= limit {
194-
// let buf = Arc::new(InMemorySpillBuffer::from_batch(batch)?);
195-
// self.metrics.spilled_bytes.add(size);
196-
// self.metrics.spilled_rows.add(batch.num_rows());
197-
// Ok(SpillLocation::Memory(buf))
198-
// } else {
199-
// // Otherwise spill to disk using the existing SpillManager logic
200-
// let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else {
201-
// return Err(DataFusionError::Execution(
202-
// "failed to spill batch to disk".into(),
203-
// ));
204-
// };
205-
// Ok(SpillLocation::Disk(Arc::new(file)))
206-
// }
181+
// Ok(SpillLocation::Disk(Arc::new(file)))
182+
// //
183+
let size = batch.get_sliced_size()?;
184+
185+
// Check current memory usage and total limit from the runtime memory pool
186+
let used = self.env.memory_pool.reserved();
187+
let limit = match self.env.memory_pool.memory_limit() {
188+
datafusion_execution::memory_pool::MemoryLimit::Finite(l) => l,
189+
_ => usize::MAX,
190+
};
191+
192+
// If there's enough memory (with a safety margin), keep it in memory
193+
if used + size * 3 / 2 <= limit {
194+
let buf = Arc::new(InMemorySpillBuffer::from_batch(batch)?);
195+
self.metrics.spilled_bytes.add(size);
196+
self.metrics.spilled_rows.add(batch.num_rows());
197+
Ok(SpillLocation::Memory(buf))
198+
} else {
199+
// Otherwise spill to disk using the existing SpillManager logic
200+
let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else {
201+
return Err(DataFusionError::Execution(
202+
"failed to spill batch to disk".into(),
203+
));
204+
};
205+
Ok(SpillLocation::Disk(Arc::new(file)))
206+
}
207207
}
208208

209209
pub fn spill_batches_auto(

0 commit comments

Comments
 (0)