Skip to content

Commit 2ad27aa

Browse files
Use LexOrdering for range partitioning
1 parent 5e9b174 commit 2ad27aa

5 files changed

Lines changed: 87 additions & 39 deletions

File tree

datafusion/physical-expr/src/partitioning.rs

Lines changed: 74 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323
};
2424
use datafusion_common::ScalarValue;
2525
use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
26-
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
26+
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
2727
use std::fmt;
2828
use std::fmt::Display;
2929
use std::sync::Arc;
@@ -149,13 +149,11 @@ impl Display for Partitioning {
149149
///
150150
/// [`RangePartitioning`] describes an ordered key space with split points.
151151
///
152-
/// - `sort_exprs` define the partitioning key and ordering.
152+
/// - `ordering` defines the partitioning key and ordering.
153153
/// - `split_points` define the boundaries between adjacent partitions.
154-
/// - The declaring source must ensure every emitted row belongs to exactly one
155-
/// declared partition and is emitted by that partition.
156154
///
157-
/// The sort expressions must be non-empty, and split points must be strictly
158-
/// ordered according to those sort expressions.
155+
/// The ordering must be non-empty, and split points must be strictly ordered
156+
/// according to that ordering.
159157
///
160158
/// `N` split points define `N + 1` partitions:
161159
///
@@ -172,7 +170,7 @@ impl Display for Partitioning {
172170
/// For a single range key:
173171
///
174172
/// ```text
175-
/// sort_exprs = [date ASC NULLS LAST]
173+
/// ordering = [date ASC NULLS LAST]
176174
/// split_points = [
177175
/// (2022-01-01),
178176
/// (2023-01-01),
@@ -184,11 +182,11 @@ impl Display for Partitioning {
184182
/// ```
185183
///
186184
/// The same model extends to compound keys.
187-
/// For `sort_exprs = [time ASC, city ASC]`, split points are ordered
185+
/// For `ordering = [time ASC, city ASC]`, split points are ordered
188186
/// lexicographically by `(time, city)`:
189187
///
190188
/// ```text
191-
/// sort_exprs = [time ASC NULLS LAST, city ASC NULLS LAST]
189+
/// ordering = [time ASC NULLS LAST, city ASC NULLS LAST]
192190
/// split_points = [
193191
/// (2022, Allston),
194192
/// (2023, Allston),
@@ -206,7 +204,7 @@ pub struct RangePartitioning {
206204
/// Ordered partitioning key. Sort options are part of the partitioning
207205
/// because `ASC`/`DESC` and null ordering decide which side of a split point
208206
/// a row belongs to.
209-
sort_exprs: Vec<PhysicalSortExpr>,
207+
ordering: LexOrdering,
210208
/// Boundaries between adjacent partitions. `N` split points define `N + 1`
211209
/// partitions as described in [`RangePartitioning`].
212210
split_points: Vec<SplitPoint>,
@@ -215,7 +213,7 @@ pub struct RangePartitioning {
215213
/// A boundary between adjacent range partitions.
216214
///
217215
/// A split point is a tuple with one [`ScalarValue`] per sort expression in the
218-
/// parent [`RangePartitioning`].
216+
/// parent [`RangePartitioning`] ordering.
219217
#[derive(Debug, Clone, PartialEq)]
220218
pub struct SplitPoint {
221219
values: Vec<ScalarValue>,
@@ -250,21 +248,21 @@ impl RangePartitioning {
250248
///
251249
/// The caller is responsible for satisfying the contract documented on
252250
/// [`RangePartitioning`]: every split point must have one value per sort
253-
/// expression, there must be at least one sort expression, and split points
254-
/// must be strictly ordered according to those sort expressions.
255-
pub fn new(sort_exprs: Vec<PhysicalSortExpr>, split_points: Vec<SplitPoint>) -> Self {
251+
/// expression in the ordering, and split points must be strictly ordered
252+
/// according to that ordering.
253+
pub fn new(ordering: LexOrdering, split_points: Vec<SplitPoint>) -> Self {
256254
Self {
257-
sort_exprs,
255+
ordering,
258256
split_points,
259257
}
260258
}
261259

262-
/// Returns the ordered expressions that define the range key.
260+
/// Returns the ordering that defines the range key.
263261
///
264262
/// Sort options are part of the partitioning because `ASC`/`DESC` and null
265263
/// ordering decide which side of a split point a row belongs to.
266-
pub fn sort_exprs(&self) -> &[PhysicalSortExpr] {
267-
&self.sort_exprs
264+
pub fn ordering(&self) -> &LexOrdering {
265+
&self.ordering
268266
}
269267

270268
/// Returns the ordered split points between partitions.
@@ -286,39 +284,38 @@ impl RangePartitioning {
286284
input_eq_properties: &EquivalenceProperties,
287285
) -> Option<Self> {
288286
let exprs = self
289-
.sort_exprs
287+
.ordering
290288
.iter()
291289
.map(|sort_expr| Arc::clone(&sort_expr.expr))
292290
.collect::<Vec<_>>();
293291
let projected_exprs = input_eq_properties
294292
.project_expressions(&exprs, mapping)
295293
.collect::<Option<Vec<_>>>()?;
296294
let sort_exprs = self
297-
.sort_exprs
295+
.ordering
298296
.iter()
299297
.zip(projected_exprs)
300298
.map(|(sort_expr, expr)| PhysicalSortExpr::new(expr, sort_expr.options))
301-
.collect();
299+
.collect::<Vec<_>>();
300+
let ordering = LexOrdering::new(sort_exprs)?;
301+
if ordering.len() != self.ordering.len() {
302+
return None;
303+
}
302304

303305
Some(Self {
304-
sort_exprs,
306+
ordering,
305307
split_points: self.split_points.clone(),
306308
})
307309
}
308310
}
309311

310312
impl Display for RangePartitioning {
311313
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
312-
let sort_exprs = self
313-
.sort_exprs
314-
.iter()
315-
.map(ToString::to_string)
316-
.collect::<Vec<_>>()
317-
.join(", ");
318314
let split_points = format_range_split_points(&self.split_points);
319315
write!(
320316
f,
321-
"Range([{sort_exprs}], [{}], {})",
317+
"Range([{}], [{}], {})",
318+
self.ordering,
322319
split_points,
323320
self.partition_count()
324321
)
@@ -335,7 +332,7 @@ fn format_range_split_points(split_points: &[SplitPoint]) -> String {
335332

336333
impl PartialEq for RangePartitioning {
337334
fn eq(&self, other: &Self) -> bool {
338-
self.sort_exprs == other.sort_exprs && self.split_points == other.split_points
335+
self.ordering == other.ordering && self.split_points == other.split_points
339336
}
340337
}
341338

@@ -565,6 +562,7 @@ mod tests {
565562

566563
use super::*;
567564
use crate::expressions::Column;
565+
use crate::projection::ProjectionTargets;
568566

569567
use arrow::compute::SortOptions;
570568
use arrow::datatypes::{DataType, Field, Schema};
@@ -1072,10 +1070,10 @@ mod tests {
10721070
Arc::new(Column::new_with_schema("a", &schema)?);
10731071

10741072
let range_partitioning = RangePartitioning::new(
1075-
vec![PhysicalSortExpr::new_default(Arc::clone(&col_a))],
1073+
[PhysicalSortExpr::new_default(Arc::clone(&col_a))].into(),
10761074
vec![int_split_point([10]), int_split_point([20])],
10771075
);
1078-
assert_eq!(range_partitioning.sort_exprs()[0].to_string(), "a@0 ASC");
1076+
assert_eq!(range_partitioning.ordering()[0].to_string(), "a@0 ASC");
10791077
assert_eq!(
10801078
range_partitioning.split_points(),
10811079
&[int_split_point([10]), int_split_point([20])]
@@ -1101,7 +1099,7 @@ mod tests {
11011099
Arc::new(Column::new_with_schema("b", &schema)?);
11021100
let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
11031101
let range_partitioning = Partitioning::Range(RangePartitioning::new(
1104-
vec![PhysicalSortExpr::new(col_b, SortOptions::new(true, false))],
1102+
[PhysicalSortExpr::new(col_b, SortOptions::new(true, false))].into(),
11051103
vec![int_split_point([10])],
11061104
));
11071105

@@ -1122,6 +1120,46 @@ mod tests {
11221120
Ok(())
11231121
}
11241122

1123+
#[test]
1124+
fn test_range_partitioning_project_degrades_if_ordering_collapses() -> Result<()> {
1125+
let schema = Arc::new(Schema::new(vec![
1126+
Field::new("a", DataType::Int64, false),
1127+
Field::new("b", DataType::Int64, false),
1128+
]));
1129+
let col_a: Arc<dyn PhysicalExpr> =
1130+
Arc::new(Column::new_with_schema("a", &schema)?);
1131+
let col_b: Arc<dyn PhysicalExpr> =
1132+
Arc::new(Column::new_with_schema("b", &schema)?);
1133+
let target: Arc<dyn PhysicalExpr> = Arc::new(Column::new("x", 0));
1134+
let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1135+
let range_partitioning = Partitioning::Range(RangePartitioning::new(
1136+
[
1137+
PhysicalSortExpr::new_default(Arc::clone(&col_a)),
1138+
PhysicalSortExpr::new_default(Arc::clone(&col_b)),
1139+
]
1140+
.into(),
1141+
vec![int_split_point([10, 100])],
1142+
));
1143+
let mapping = ProjectionMapping::from_iter([
1144+
(
1145+
Arc::clone(&col_a),
1146+
ProjectionTargets::from(vec![(Arc::clone(&target), 0)]),
1147+
),
1148+
(
1149+
Arc::clone(&col_b),
1150+
ProjectionTargets::from(vec![(Arc::clone(&target), 0)]),
1151+
),
1152+
]);
1153+
1154+
let projected = range_partitioning.project(&mapping, &eq_properties);
1155+
let Partitioning::UnknownPartitioning(partition_count) = projected else {
1156+
panic!("expected UnknownPartitioning, got {projected:?}");
1157+
};
1158+
assert_eq!(partition_count, 2);
1159+
1160+
Ok(())
1161+
}
1162+
11251163
#[test]
11261164
fn test_multi_partition_range_does_not_satisfy_hash_distribution() -> Result<()> {
11271165
let schema = Arc::new(Schema::new(vec![
@@ -1135,10 +1173,11 @@ mod tests {
11351173

11361174
let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
11371175
let range_partitioning = Partitioning::Range(RangePartitioning::new(
1138-
vec![
1176+
[
11391177
PhysicalSortExpr::new_default(Arc::clone(&col_a)),
11401178
PhysicalSortExpr::new_default(Arc::clone(&col_b)),
1141-
],
1179+
]
1180+
.into(),
11421181
vec![int_split_point([10, 100])],
11431182
));
11441183
let required = Distribution::HashPartitioned(vec![col_a, col_b]);

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2326,7 +2326,7 @@ mod tests {
23262326
let expr = col("my_awesome_field", &schema)?;
23272327
let input = MockExec::new(vec![Ok(batch)], Arc::clone(&schema));
23282328
let partitioning = Partitioning::Range(RangePartitioning::new(
2329-
vec![PhysicalSortExpr::new_default(expr)],
2329+
[PhysicalSortExpr::new_default(expr)].into(),
23302330
vec![SplitPoint::new(vec![ScalarValue::Utf8(Some(
23312331
"foo".to_string(),
23322332
))])],

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -667,13 +667,22 @@ fn parse_protobuf_range_partitioning(
667667
input_schema,
668668
proto_converter,
669669
)?;
670+
let sort_expr_count = sort_exprs.len();
671+
let ordering = LexOrdering::new(sort_exprs).ok_or_else(|| {
672+
internal_datafusion_err!("Range partitioning requires non-empty ordering")
673+
})?;
674+
if ordering.len() != sort_expr_count {
675+
return Err(internal_datafusion_err!(
676+
"Range partitioning ordering must not contain duplicate expressions"
677+
));
678+
}
670679
let split_points = range_partitioning
671680
.split_point
672681
.iter()
673682
.map(parse_protobuf_range_split_point)
674683
.collect::<Result<_>>()?;
675684
Ok(Partitioning::Range(RangePartitioning::new(
676-
sort_exprs,
685+
ordering,
677686
split_points,
678687
)))
679688
}

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ fn serialize_range_partitioning(
644644
) -> Result<protobuf::PhysicalRangePartitioning> {
645645
Ok(protobuf::PhysicalRangePartitioning {
646646
sort_expr: serialize_physical_sort_exprs(
647-
range.sort_exprs().iter().cloned(),
647+
range.ordering().iter().cloned(),
648648
codec,
649649
proto_converter,
650650
)?,

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1808,7 +1808,7 @@ fn roundtrip_range_partitioning() -> Result<()> {
18081808
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
18091809
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
18101810
let range_partitioning = Partitioning::Range(RangePartitioning::new(
1811-
vec![PhysicalSortExpr::new_default(col("a", &schema)?)],
1811+
[PhysicalSortExpr::new_default(col("a", &schema)?)].into(),
18121812
vec![SplitPoint::new(vec![ScalarValue::Int64(Some(10))])],
18131813
));
18141814
// RepartitionExec is used only to carry the partitioning through proto.

0 commit comments

Comments
 (0)