Skip to content

Commit 5ff9fa5

Browse files
Introduce custom partitioning trait
1 parent 30d1d06 commit 5ff9fa5

9 files changed

Lines changed: 179 additions & 44 deletions

File tree

datafusion/ffi/src/physical_expr/partitioning.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ impl From<&Partitioning> for FFI_Partitioning {
4646
Self::Hash(exprs, *size)
4747
}
4848
Partitioning::UnknownPartitioning(size) => Self::UnknownPartitioning(*size),
49+
Partitioning::Custom(custom) => {
50+
Self::UnknownPartitioning(custom.partition_count())
51+
}
4952
}
5053
}
5154
}

datafusion/physical-expr/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ pub use equivalence::{
5959
AcrossPartitions, ConstExpr, EquivalenceProperties, calculate_union,
6060
};
6161
pub use partitioning::{
62-
Distribution, PartitionRange, Partitioning, PartitioningCompatibility, RangeBound,
63-
RangePartitioning,
62+
Distribution, PartitionRange, Partitioning, PartitioningCompatibility,
63+
PhysicalPartitioning, RangeBound, RangePartitioning,
6464
};
6565
pub use physical_expr::{
6666
add_offset_to_expr, add_offset_to_physical_sort_exprs, create_lex_ordering,

datafusion/physical-expr/src/partitioning.rs

Lines changed: 151 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::{
2323
};
2424
use datafusion_common::{Result, ScalarValue, plan_err};
2525
use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
26+
use std::any::Any;
2627
use std::fmt;
2728
use std::fmt::Display;
2829
use std::sync::Arc;
@@ -118,16 +119,50 @@ pub enum Partitioning {
118119
/// Allocate rows based on a hash of one of more expressions and the specified number of
119120
/// partitions
120121
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
121-
/// Allocate rows based on disjoint ordered ranges of one or more expressions.
122-
///
123-
/// This variant describes partitioning that already exists in an input source.
124-
/// It does not imply DataFusion can repartition arbitrary input into these
125-
/// ranges.
126-
Range(RangePartitioning),
122+
/// Custom partitioning scheme backed by a trait object.
123+
Custom(Arc<dyn PhysicalPartitioning>),
127124
/// Unknown partitioning scheme with a known number of partitions
128125
UnknownPartitioning(usize),
129126
}
130127

128+
/// Extensible physical partitioning contract.
129+
///
130+
/// This is a POC shape for partitioning schemes that are not built in to the
131+
/// [`Partitioning`] enum. It intentionally mirrors the operations DataFusion
132+
/// already asks of partitioning metadata.
133+
pub trait PhysicalPartitioning: fmt::Debug + Display + Send + Sync {
134+
/// Used for display and downcasting-friendly diagnostics.
135+
fn name(&self) -> &str;
136+
137+
/// Number of output partitions.
138+
fn partition_count(&self) -> usize;
139+
140+
/// Returns how this partitioning satisfies a required [`Distribution`].
141+
fn satisfaction(
142+
&self,
143+
required: &Distribution,
144+
eq_properties: &EquivalenceProperties,
145+
allow_subset: bool,
146+
) -> PartitioningSatisfaction;
147+
148+
/// Returns whether this partitioning describes the same logical partition
149+
/// map as `other`.
150+
fn compatibility(
151+
&self,
152+
other: &dyn PhysicalPartitioning,
153+
) -> PartitioningCompatibility;
154+
155+
/// Calculate the output partitioning after applying a projection.
156+
fn project(
157+
&self,
158+
mapping: &ProjectionMapping,
159+
input_eq_properties: &EquivalenceProperties,
160+
) -> Arc<dyn PhysicalPartitioning>;
161+
162+
/// Downcast hook for implementations that understand each other.
163+
fn as_any(&self) -> &dyn Any;
164+
}
165+
131166
impl Display for Partitioning {
132167
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
133168
match self {
@@ -140,7 +175,7 @@ impl Display for Partitioning {
140175
.join(", ");
141176
write!(f, "Hash([{phy_exprs_str}], {size})")
142177
}
143-
Partitioning::Range(range) => write!(f, "{range}"),
178+
Partitioning::Custom(custom) => write!(f, "{custom}"),
144179
Partitioning::UnknownPartitioning(size) => {
145180
write!(f, "UnknownPartitioning({size})")
146181
}
@@ -204,7 +239,7 @@ impl RangePartitioning {
204239
/// the required expressions are colocated in one partition. The routing is
205240
/// range-based rather than hash-based, but the distribution property is the
206241
/// same property hash joins and grouped aggregations require.
207-
pub fn satisfaction(
242+
pub fn hash_distribution_satisfaction(
208243
&self,
209244
required_exprs: &[Arc<dyn PhysicalExpr>],
210245
eq_properties: &EquivalenceProperties,
@@ -220,7 +255,7 @@ impl RangePartitioning {
220255

221256
/// Returns whether this range partitioning has the same partition map as
222257
/// another range partitioning.
223-
pub fn compatibility(&self, other: &Self) -> PartitioningCompatibility {
258+
pub fn compatibility_with_range(&self, other: &Self) -> PartitioningCompatibility {
224259
if !physical_exprs_equal(&self.exprs, &other.exprs) {
225260
return PartitioningCompatibility::Incompatible;
226261
}
@@ -232,7 +267,7 @@ impl RangePartitioning {
232267
}
233268
}
234269

235-
fn project(
270+
fn project_with_mapping(
236271
&self,
237272
mapping: &ProjectionMapping,
238273
input_eq_properties: &EquivalenceProperties,
@@ -269,6 +304,69 @@ impl Display for RangePartitioning {
269304
}
270305
}
271306

307+
impl PhysicalPartitioning for RangePartitioning {
308+
fn name(&self) -> &str {
309+
"range"
310+
}
311+
312+
fn partition_count(&self) -> usize {
313+
self.ranges.len()
314+
}
315+
316+
fn satisfaction(
317+
&self,
318+
required: &Distribution,
319+
eq_properties: &EquivalenceProperties,
320+
allow_subset: bool,
321+
) -> PartitioningSatisfaction {
322+
match required {
323+
Distribution::UnspecifiedDistribution => PartitioningSatisfaction::Exact,
324+
Distribution::SinglePartition if self.partition_count() == 1 => {
325+
PartitioningSatisfaction::Exact
326+
}
327+
Distribution::HashPartitioned(_) if self.partition_count() == 1 => {
328+
PartitioningSatisfaction::Exact
329+
}
330+
Distribution::HashPartitioned(required_exprs) => {
331+
RangePartitioning::hash_distribution_satisfaction(
332+
self,
333+
required_exprs,
334+
eq_properties,
335+
allow_subset,
336+
)
337+
}
338+
_ => PartitioningSatisfaction::NotSatisfied,
339+
}
340+
}
341+
342+
fn compatibility(
343+
&self,
344+
other: &dyn PhysicalPartitioning,
345+
) -> PartitioningCompatibility {
346+
other
347+
.as_any()
348+
.downcast_ref::<RangePartitioning>()
349+
.map(|other| self.compatibility_with_range(other))
350+
.unwrap_or(PartitioningCompatibility::Incompatible)
351+
}
352+
353+
fn project(
354+
&self,
355+
mapping: &ProjectionMapping,
356+
input_eq_properties: &EquivalenceProperties,
357+
) -> Arc<dyn PhysicalPartitioning> {
358+
Arc::new(RangePartitioning::project_with_mapping(
359+
self,
360+
mapping,
361+
input_eq_properties,
362+
))
363+
}
364+
365+
fn as_any(&self) -> &dyn Any {
366+
self
367+
}
368+
}
369+
272370
/// A single partition's lexicographic value range.
273371
#[derive(Debug, Clone, PartialEq, Eq)]
274372
pub struct PartitionRange {
@@ -420,12 +518,38 @@ impl PartitioningSatisfaction {
420518
}
421519

422520
impl Partitioning {
521+
/// Create a [`Partitioning`] from a custom partitioning implementation.
522+
pub fn custom(partitioning: Arc<dyn PhysicalPartitioning>) -> Self {
523+
Self::Custom(partitioning)
524+
}
525+
526+
/// Create a [`Partitioning`] from range partitioning metadata.
527+
pub fn range(range: RangePartitioning) -> Self {
528+
Self::Custom(Arc::new(range))
529+
}
530+
531+
/// Returns the custom partitioning implementation, if this partitioning is
532+
/// backed by one.
533+
pub fn as_custom(&self) -> Option<&Arc<dyn PhysicalPartitioning>> {
534+
match self {
535+
Self::Custom(custom) => Some(custom),
536+
_ => None,
537+
}
538+
}
539+
540+
/// Returns range partitioning metadata when this custom partitioning is a
541+
/// [`RangePartitioning`].
542+
pub fn as_range(&self) -> Option<&RangePartitioning> {
543+
self.as_custom()
544+
.and_then(|custom| custom.as_any().downcast_ref::<RangePartitioning>())
545+
}
546+
423547
/// Returns the number of partitions in this partitioning scheme
424548
pub fn partition_count(&self) -> usize {
425549
use Partitioning::*;
426550
match self {
427551
RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
428-
Range(range) => range.partition_count(),
552+
Custom(custom) => custom.partition_count(),
429553
}
430554
}
431555

@@ -466,11 +590,8 @@ impl Partitioning {
466590
eq_properties,
467591
allow_subset,
468592
),
469-
// Range partitioning satisfies hash distribution requirements
470-
// because the requirement only needs equal key values to be
471-
// colocated in a partition. It does not require hash routing.
472-
Partitioning::Range(range) => {
473-
range.satisfaction(required_exprs, eq_properties, allow_subset)
593+
Partitioning::Custom(custom) => {
594+
custom.satisfaction(required, eq_properties, allow_subset)
474595
}
475596
_ => PartitioningSatisfaction::NotSatisfied,
476597
},
@@ -489,8 +610,8 @@ impl Partitioning {
489610
project_partition_exprs(exprs, mapping, input_eq_properties),
490611
*part,
491612
),
492-
Partitioning::Range(range) => {
493-
Partitioning::Range(range.project(mapping, input_eq_properties))
613+
Partitioning::Custom(custom) => {
614+
Partitioning::Custom(custom.project(mapping, input_eq_properties))
494615
}
495616
_ => self.clone(),
496617
}
@@ -508,8 +629,8 @@ impl Partitioning {
508629
{
509630
PartitioningCompatibility::SamePartitionMap
510631
}
511-
(Partitioning::Range(left), Partitioning::Range(right)) => {
512-
left.compatibility(right)
632+
(Partitioning::Custom(left), Partitioning::Custom(right)) => {
633+
left.compatibility(right.as_ref())
513634
}
514635
(Partitioning::UnknownPartitioning(_), _)
515636
| (_, Partitioning::UnknownPartitioning(_)) => {
@@ -608,8 +729,9 @@ impl PartialEq for Partitioning {
608729
{
609730
true
610731
}
611-
(Partitioning::Range(range1), Partitioning::Range(range2))
612-
if range1 == range2 =>
732+
(Partitioning::Custom(left), Partitioning::Custom(right))
733+
if left.compatibility(right.as_ref())
734+
== PartitioningCompatibility::SamePartitionMap =>
613735
{
614736
true
615737
}
@@ -797,7 +919,7 @@ mod tests {
797919
Arc::new(Column::new_with_schema("b", &schema)?);
798920
let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
799921

800-
let range = Partitioning::Range(RangePartitioning::try_new(
922+
let range = Partitioning::range(RangePartitioning::try_new(
801923
vec![Arc::clone(&col_a)],
802924
vec![
803925
PartitionRange::new(
@@ -867,7 +989,7 @@ mod tests {
867989

868990
let col_b: Arc<dyn PhysicalExpr> =
869991
Arc::new(Column::new_with_schema("b", &schema)?);
870-
let range = Partitioning::Range(RangePartitioning::try_new(
992+
let range = Partitioning::range(RangePartitioning::try_new(
871993
vec![Arc::clone(&col_b)],
872994
vec![PartitionRange::unbounded()],
873995
)?);
@@ -878,7 +1000,7 @@ mod tests {
8781000
let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
8791001

8801002
let projected = range.project(&projection, &eq_properties);
881-
let expected = Partitioning::Range(RangePartitioning::try_new(
1003+
let expected = Partitioning::range(RangePartitioning::try_new(
8821004
vec![Arc::new(Column::new("b", 0))],
8831005
vec![PartitionRange::unbounded()],
8841006
)?);
@@ -931,19 +1053,19 @@ mod tests {
9311053
RangePartitioning::try_new(vec![col_b], vec![PartitionRange::unbounded()])?;
9321054

9331055
assert_eq!(
934-
range_a_10.compatibility(&range_a_10_again),
1056+
range_a_10.compatibility_with_range(&range_a_10_again),
9351057
PartitioningCompatibility::SamePartitionMap,
9361058
);
9371059
assert_eq!(
938-
range_a_10.compatibility(&range_a_20),
1060+
range_a_10.compatibility_with_range(&range_a_20),
9391061
PartitioningCompatibility::SameExpressionsDifferentBounds,
9401062
);
9411063
assert_eq!(
942-
range_a_10.compatibility(&range_b_10),
1064+
range_a_10.compatibility_with_range(&range_b_10),
9431065
PartitioningCompatibility::Incompatible,
9441066
);
9451067
assert_eq!(
946-
Partitioning::Range(range_a_10)
1068+
Partitioning::range(range_a_10)
9471069
.compatibility(&Partitioning::UnknownPartitioning(2)),
9481070
PartitioningCompatibility::Unknown,
9491071
);

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,8 @@ fn reorder_current_join_keys(
696696
result => result,
697697
}
698698
}
699-
(Some(Partitioning::Range(left_range)), _) => {
699+
(Some(left), _) if left.as_range().is_some() => {
700+
let left_range = left.as_range().expect("checked above");
700701
match try_reorder(join_keys, left_range.exprs(), left_equivalence_properties)
701702
{
702703
(join_keys, None) => reorder_current_join_keys(
@@ -712,7 +713,8 @@ fn reorder_current_join_keys(
712713
(_, Some(Partitioning::Hash(right_exprs, _))) => {
713714
try_reorder(join_keys, right_exprs, right_equivalence_properties)
714715
}
715-
(_, Some(Partitioning::Range(right_range))) => {
716+
(_, Some(right)) if right.as_range().is_some() => {
717+
let right_range = right.as_range().expect("checked above");
716718
try_reorder(join_keys, right_range.exprs(), right_equivalence_properties)
717719
}
718720
_ => (join_keys, None),

datafusion/physical-plan/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use datafusion_physical_expr::PhysicalSortExpr;
3838
pub use datafusion_physical_expr::window::WindowExpr;
3939
pub use datafusion_physical_expr::{
4040
Distribution, PartitionRange, Partitioning, PartitioningCompatibility, PhysicalExpr,
41-
RangeBound, RangePartitioning, expressions,
41+
PhysicalPartitioning, RangeBound, RangePartitioning, expressions,
4242
};
4343

4444
pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1318,7 +1318,7 @@ impl ExecutionPlan for RepartitionExec {
13181318
new_properties.partitioning = match new_properties.partitioning {
13191319
RoundRobinBatch(_) => RoundRobinBatch(target_partitions),
13201320
Hash(hash, _) => Hash(hash, target_partitions),
1321-
Range(range) => Range(range),
1321+
Custom(custom) => Custom(custom),
13221322
UnknownPartitioning(_) => UnknownPartitioning(target_partitions),
13231323
};
13241324
Ok(Some(Arc::new(Self {
@@ -1339,8 +1339,8 @@ impl RepartitionExec {
13391339
input: Arc<dyn ExecutionPlan>,
13401340
partitioning: Partitioning,
13411341
) -> Result<Self> {
1342-
if matches!(partitioning, Partitioning::Range(_)) {
1343-
return plan_err!("RepartitionExec does not support range repartitioning");
1342+
if matches!(partitioning, Partitioning::Custom(_)) {
1343+
return plan_err!("RepartitionExec does not support custom repartitioning");
13441344
}
13451345

13461346
let preserve_order = false;

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,7 @@ fn parse_protobuf_range_partitioning(
668668
.map(parse_partition_range)
669669
.collect::<Result<Vec<_>>>()?;
670670

671-
Ok(Partitioning::Range(RangePartitioning::try_new(
671+
Ok(Partitioning::range(RangePartitioning::try_new(
672672
exprs, ranges,
673673
)?))
674674
}

0 commit comments

Comments
 (0)