Skip to content

Commit 14034f3

Browse files
Route dynamic filters by partition compatibility
1 parent 62041a0 commit 14034f3

6 files changed

Lines changed: 491 additions & 113 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,10 @@ use datafusion_common::encryption::FileDecryptionProperties;
4343
use datafusion_common::stats::Precision;
4444
use datafusion_common::{
4545
ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err,
46+
tree_node::{Transformed, TransformedResult, TreeNode},
4647
};
4748
use datafusion_datasource::{PartitionedFile, TableSchema};
49+
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
4850
use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
4951
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
5052
use datafusion_physical_expr_common::physical_expr::{
@@ -597,7 +599,11 @@ impl ParquetMorselizer {
597599
));
598600

599601
let mut projection = self.projection.clone();
600-
let mut predicate = self.predicate.clone();
602+
let mut predicate = self
603+
.predicate
604+
.clone()
605+
.map(|p| rewrite_partition_index_dynamic_filters(p, self.partition_index))
606+
.transpose()?;
601607
if !literal_columns.is_empty() {
602608
projection = projection.try_map_exprs(|expr| {
603609
replace_columns_with_literals(Arc::clone(&expr), &literal_columns)
@@ -1589,6 +1595,27 @@ pub(crate) fn build_pruning_predicates(
15891595
)
15901596
}
15911597

1598+
/// Replaces partition-index dynamic filters with the filter for the parquet
1599+
/// execution partition currently opening a file.
1600+
fn rewrite_partition_index_dynamic_filters(
1601+
predicate: Arc<dyn PhysicalExpr>,
1602+
partition_index: usize,
1603+
) -> Result<Arc<dyn PhysicalExpr>> {
1604+
predicate
1605+
.transform_up(|expr| {
1606+
let Some(dynamic_filter) = expr.downcast_ref::<DynamicFilterPhysicalExpr>()
1607+
else {
1608+
return Ok(Transformed::no(expr));
1609+
};
1610+
1611+
match dynamic_filter.partition_filter(partition_index)? {
1612+
Some(partition_expr) => Ok(Transformed::yes(partition_expr)),
1613+
None => Ok(Transformed::no(expr)),
1614+
}
1615+
})
1616+
.data()
1617+
}
1618+
15921619
/// Returns a `ArrowReaderMetadata` with the page index loaded, loading
15931620
/// it from the underlying `AsyncFileReader` if necessary.
15941621
async fn load_page_index<T: AsyncFileReader>(
@@ -1637,7 +1664,7 @@ mod test {
16371664
use datafusion_expr::{col, lit};
16381665
use datafusion_physical_expr::{
16391666
PhysicalExpr,
1640-
expressions::{Column, DynamicFilterPhysicalExpr, Literal},
1667+
expressions::{Column, DynamicFilterPhysicalExpr, Literal, lit as physical_lit},
16411668
planner::logical2physical,
16421669
projection::ProjectionExprs,
16431670
};
@@ -2011,6 +2038,41 @@ mod test {
20112038
))
20122039
}
20132040

2041+
#[test]
2042+
fn test_rewrite_partition_index_dynamic_filters() {
2043+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2044+
let initial = logical2physical(&col("a").gt(lit(0)), &schema);
2045+
let partition_0 = logical2physical(&col("a").gt(lit(10)), &schema);
2046+
2047+
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
2048+
initial.children().into_iter().map(Arc::clone).collect(),
2049+
initial,
2050+
));
2051+
dynamic_filter
2052+
.update_partitioned(physical_lit(true), vec![Some(partition_0), None])
2053+
.unwrap();
2054+
2055+
let rewritten_0 = rewrite_partition_index_dynamic_filters(
2056+
Arc::clone(&dynamic_filter) as Arc<dyn PhysicalExpr>,
2057+
0,
2058+
)
2059+
.unwrap();
2060+
assert_eq!(
2061+
format!("{rewritten_0:?}"),
2062+
r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Gt, right: Literal { value: Int32(10), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#
2063+
);
2064+
2065+
let rewritten_1 = rewrite_partition_index_dynamic_filters(
2066+
dynamic_filter as Arc<dyn PhysicalExpr>,
2067+
1,
2068+
)
2069+
.unwrap();
2070+
assert_eq!(
2071+
format!("{rewritten_1:?}"),
2072+
r#"Literal { value: Boolean(false), field: Field { name: "lit", data_type: Boolean } }"#
2073+
);
2074+
}
2075+
20142076
#[tokio::test]
20152077
async fn test_prune_on_statistics() {
20162078
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 143 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
2020
use std::{fmt::Display, hash::Hash, sync::Arc};
2121
use tokio::sync::watch;
2222

23-
use crate::PhysicalExpr;
23+
use crate::{PhysicalExpr, expressions::lit};
2424
use arrow::datatypes::{DataType, Schema};
2525
use datafusion_common::{
2626
Result,
@@ -94,6 +94,11 @@ pub struct Inner {
9494
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
9595
pub generation: u64,
9696
pub expr: Arc<dyn PhysicalExpr>,
97+
/// Per-partition filter expressions for partition-index routing.
98+
/// When set (via [`DynamicFilterPhysicalExpr::update_partitioned`]), each
99+
/// entry corresponds to a build partition. `Some(expr)` means the partition
100+
/// has a filter. `None` means the partition was empty.
101+
pub partitioned_exprs: Option<Vec<Option<Arc<dyn PhysicalExpr>>>>,
97102
/// Flag for quick synchronous check if filter is complete.
98103
/// This is redundant with the watch channel state, but allows us to return immediately
99104
/// from `wait_complete()` without subscribing if already complete.
@@ -126,6 +131,7 @@ impl Inner {
126131
// This is not currently used anywhere but it seems useful to have this simple distinction.
127132
generation: 1,
128133
expr,
134+
partitioned_exprs: None,
129135
is_complete: false,
130136
}
131137
}
@@ -278,6 +284,7 @@ impl DynamicFilterPhysicalExpr {
278284
expression_id: current.expression_id,
279285
generation: new_generation,
280286
expr: new_expr,
287+
partitioned_exprs: None,
281288
is_complete: current.is_complete,
282289
};
283290
drop(current); // Release the lock before broadcasting
@@ -289,6 +296,83 @@ impl DynamicFilterPhysicalExpr {
289296
Ok(())
290297
}
291298

299+
/// Update this filter with a global fallback plus partition-local filters.
300+
pub fn update_partitioned(
301+
&self,
302+
global_expr: Arc<dyn PhysicalExpr>,
303+
partitioned_exprs: Vec<Option<Arc<dyn PhysicalExpr>>>,
304+
) -> Result<()> {
305+
let global_expr = Self::remap_children(
306+
&self.children,
307+
self.remapped_children.as_ref(),
308+
global_expr,
309+
)?;
310+
let partitioned_exprs = partitioned_exprs
311+
.into_iter()
312+
.map(|expr| {
313+
expr.map(|expr| {
314+
Self::remap_children(
315+
&self.children,
316+
self.remapped_children.as_ref(),
317+
expr,
318+
)
319+
})
320+
.transpose()
321+
})
322+
.collect::<Result<Vec<_>>>()?;
323+
324+
let mut current = self.inner.write();
325+
let new_generation = current.generation + 1;
326+
*current = Inner {
327+
expression_id: current.expression_id,
328+
generation: new_generation,
329+
expr: global_expr,
330+
partitioned_exprs: Some(partitioned_exprs),
331+
is_complete: current.is_complete,
332+
};
333+
drop(current);
334+
335+
let _ = self.state_watch.send(FilterState::InProgress {
336+
generation: new_generation,
337+
});
338+
Ok(())
339+
}
340+
341+
/// Returns the filter expression for a specific partition, with children
342+
/// remapped to match any prior [`PhysicalExpr::with_new_children`] calls.
343+
///
344+
/// Returns `None` if no per-partition data has been stored. Returns
345+
/// `Some(lit(false))` if the partition was empty.
346+
pub fn partition_filter(
347+
&self,
348+
partition: usize,
349+
) -> Result<Option<Arc<dyn PhysicalExpr>>> {
350+
let partitioned_expr = self
351+
.inner
352+
.read()
353+
.partitioned_exprs
354+
.as_ref()
355+
.and_then(|partitioned_exprs| partitioned_exprs.get(partition))
356+
.cloned();
357+
358+
partitioned_expr
359+
.map(|expr| {
360+
expr.map_or_else(
361+
|| Ok(Some(lit(false))),
362+
|expr| {
363+
Self::remap_children(
364+
&self.children,
365+
self.remapped_children.as_ref(),
366+
expr,
367+
)
368+
.map(Some)
369+
},
370+
)
371+
})
372+
.transpose()
373+
.map(Option::flatten)
374+
}
375+
292376
/// Mark this dynamic filter as complete and broadcast to all waiters.
293377
///
294378
/// This signals that all expected updates have been received.
@@ -700,6 +784,64 @@ mod test {
700784
assert_eq!(snapshot, Some(new_expr));
701785
}
702786

787+
#[test]
788+
fn test_partition_filter() {
789+
let dynamic_filter =
790+
DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc<dyn PhysicalExpr>);
791+
792+
assert!(dynamic_filter.partition_filter(0).unwrap().is_none());
793+
794+
dynamic_filter
795+
.update_partitioned(lit(true), vec![Some(lit(true)), None])
796+
.unwrap();
797+
798+
let partition_0 = dynamic_filter.partition_filter(0).unwrap().unwrap();
799+
insta::assert_snapshot!(format!("{partition_0:?}"), @r#"Literal { value: Boolean(true), field: Field { name: "lit", data_type: Boolean } }"#);
800+
801+
let partition_1 = dynamic_filter.partition_filter(1).unwrap().unwrap();
802+
insta::assert_snapshot!(format!("{partition_1:?}"), @r#"Literal { value: Boolean(false), field: Field { name: "lit", data_type: Boolean } }"#);
803+
804+
assert!(dynamic_filter.partition_filter(2).unwrap().is_none());
805+
}
806+
807+
#[test]
808+
fn test_partition_filter_remaps_children() {
809+
let source_schema =
810+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
811+
let col_a = col("a", &source_schema).unwrap();
812+
let filter = Arc::new(DynamicFilterPhysicalExpr::new(
813+
vec![Arc::clone(&col_a)],
814+
lit(true) as Arc<dyn PhysicalExpr>,
815+
));
816+
817+
filter
818+
.update_partitioned(
819+
lit(true),
820+
vec![Some(Arc::new(BinaryExpr::new(
821+
Arc::clone(&col_a),
822+
datafusion_expr::Operator::Gt,
823+
lit(10) as Arc<dyn PhysicalExpr>,
824+
)))],
825+
)
826+
.unwrap();
827+
828+
let reassigned_schema = Arc::new(Schema::new(vec![
829+
Field::new("b", DataType::Int32, false),
830+
Field::new("a", DataType::Int32, false),
831+
]));
832+
let reassigned = reassign_expr_columns(
833+
Arc::clone(&filter) as Arc<dyn PhysicalExpr>,
834+
&reassigned_schema,
835+
)
836+
.unwrap();
837+
let reassigned = reassigned
838+
.downcast_ref::<DynamicFilterPhysicalExpr>()
839+
.expect("Expected dynamic filter after reassignment");
840+
841+
let partition_filter = reassigned.partition_filter(0).unwrap().unwrap();
842+
insta::assert_snapshot!(format!("{partition_filter:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 1 }, op: Gt, right: Literal { value: Int32(10), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#);
843+
}
844+
703845
#[test]
704846
fn test_dynamic_filter_physical_expr_misbehaves_data_type_nullable() {
705847
let dynamic_filter =

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ use crate::repartition::REPARTITION_RANDOM_STATE;
5555
use crate::spill::get_record_batch_memory_size;
5656
use crate::{
5757
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
58-
PlanProperties, SendableRecordBatchStream, Statistics,
58+
PartitioningCompatibility, PlanProperties, SendableRecordBatchStream, Statistics,
5959
common::can_project,
6060
joins::utils::{
6161
BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
@@ -764,11 +764,25 @@ pub struct HashJoinExec {
764764
struct HashJoinExecDynamicFilter {
765765
/// Dynamic filter that we'll update with the results of the build side once that is done.
766766
filter: Arc<DynamicFilterPhysicalExpr>,
767+
/// How partitioned build-side filters should be routed on the probe side.
768+
routing_mode: DynamicFilterRoutingMode,
767769
/// Build accumulator to collect build-side information (hash maps and/or bounds) from each partition.
768770
/// It is lazily initialized during execution to make sure we use the actual execution time partition counts.
769771
build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
770772
}
771773

774+
/// Routing strategy for partitioned dynamic filters.
775+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
776+
pub(crate) enum DynamicFilterRoutingMode {
777+
/// Route by hash-case expression, used when rows are hash repartitioned.
778+
CaseHash,
779+
/// Route by partition index. Used when both sides satisfy the join's
780+
/// distribution without repartitioning and share the same partition map.
781+
PartitionIndex,
782+
/// Use one global filter that safely covers all build partitions.
783+
Global,
784+
}
785+
772786
impl fmt::Debug for HashJoinExec {
773787
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
774788
f.debug_struct("HashJoinExec")
@@ -842,23 +856,32 @@ impl HashJoinExec {
842856

843857
fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool {
844858
let (_, probe_preserved) = self.join_type.on_lr_is_preserved();
845-
if !probe_preserved || !config.optimizer.enable_join_dynamic_filter_pushdown {
846-
return false;
847-
}
859+
probe_preserved && config.optimizer.enable_join_dynamic_filter_pushdown
860+
}
848861

849-
// `preserve_file_partitions` can report Hash partitioning for Hive-style
850-
// file groups, but those partitions are not actually hash-distributed.
851-
// Partitioned dynamic filters rely on hash routing, so disable them in
852-
// this mode to avoid incorrect results. Follow-up work: enable dynamic
853-
// filtering for preserve_file_partitioned scans (issue #20195).
854-
// https://github.com/apache/datafusion/issues/20195
855-
if config.optimizer.preserve_file_partitions > 0
856-
&& self.mode == PartitionMode::Partitioned
857-
{
858-
return false;
862+
fn dynamic_filter_routing_mode(&self) -> DynamicFilterRoutingMode {
863+
if self.mode != PartitionMode::Partitioned {
864+
return DynamicFilterRoutingMode::Global;
859865
}
860866

861-
true
867+
let left_partitioning = self.left.output_partitioning();
868+
let right_partitioning = self.right.output_partitioning();
869+
let compatibility = left_partitioning.compatibility(right_partitioning);
870+
871+
if matches!(
872+
(left_partitioning, right_partitioning),
873+
(Partitioning::Hash(_, _), Partitioning::Hash(_, _))
874+
) {
875+
if compatibility == PartitioningCompatibility::SamePartitionMap {
876+
DynamicFilterRoutingMode::CaseHash
877+
} else {
878+
DynamicFilterRoutingMode::Global
879+
}
880+
} else if compatibility == PartitioningCompatibility::SamePartitionMap {
881+
DynamicFilterRoutingMode::PartitionIndex
882+
} else {
883+
DynamicFilterRoutingMode::Global
884+
}
862885
}
863886

864887
/// left (build) side which gets hashed
@@ -1336,6 +1359,7 @@ impl ExecutionPlan for HashJoinExec {
13361359
filter,
13371360
on_right,
13381361
repartition_random_state,
1362+
df.routing_mode,
13391363
))
13401364
})))
13411365
})
@@ -1676,6 +1700,7 @@ impl ExecutionPlan for HashJoinExec {
16761700
.builder()
16771701
.with_dynamic_filter(Some(HashJoinExecDynamicFilter {
16781702
filter: dynamic_filter,
1703+
routing_mode: self.dynamic_filter_routing_mode(),
16791704
build_accumulator: OnceLock::new(),
16801705
}))
16811706
.build_exec()?;
@@ -2375,8 +2400,10 @@ mod tests {
23752400
NullEquality::NullEqualsNothing,
23762401
false,
23772402
)?;
2403+
let routing_mode = join.dynamic_filter_routing_mode();
23782404
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
23792405
filter: Arc::clone(&dynamic_filter),
2406+
routing_mode,
23802407
build_accumulator: OnceLock::new(),
23812408
});
23822409

0 commit comments

Comments
 (0)