Skip to content

Commit 4bac1cf

Browse files
authored
impl ser/de for preserve_order in RepartitionExec (#20798)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #20797 ## Rationale for this change - see #20797 ## What changes are included in this PR? impl ser/de for preserve_order in RepartitionExec ## Are these changes tested? add one test case ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 2589fa8 commit 4bac1cf

File tree

5 files changed

+56
-4
lines changed

5 files changed

+56
-4
lines changed

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1353,6 +1353,7 @@ message RepartitionExecNode{
13531353
// uint64 unknown = 4;
13541354
// }
13551355
Partitioning partitioning = 5;
1356+
bool preserve_order = 6;
13561357
}
13571358

13581359
message Partitioning {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,10 +1011,11 @@ impl protobuf::PhysicalPlanNode {
10111011
codec,
10121012
proto_converter,
10131013
)?;
1014-
Ok(Arc::new(RepartitionExec::try_new(
1015-
input,
1016-
partitioning.unwrap(),
1017-
)?))
1014+
let mut repart_exec = RepartitionExec::try_new(input, partitioning.unwrap())?;
1015+
if repart.preserve_order {
1016+
repart_exec = repart_exec.with_preserve_order();
1017+
}
1018+
Ok(Arc::new(repart_exec))
10181019
}
10191020

10201021
fn try_into_global_limit_physical_plan(
@@ -3056,6 +3057,7 @@ impl protobuf::PhysicalPlanNode {
30563057
protobuf::RepartitionExecNode {
30573058
input: Some(Box::new(input)),
30583059
partitioning: Some(pb_partitioning),
3060+
preserve_order: exec.preserve_order(),
30593061
},
30603062
))),
30613063
})

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1781,6 +1781,35 @@ fn roundtrip_union() -> Result<()> {
17811781
roundtrip_test(union)
17821782
}
17831783

1784+
#[test]
1785+
fn roundtrip_repartition_preserve_order() -> Result<()> {
1786+
let field_a = Field::new("a", DataType::Int64, false);
1787+
let schema = Arc::new(Schema::new(vec![field_a]));
1788+
let sort_exprs: LexOrdering = [PhysicalSortExpr {
1789+
expr: col("a", &schema)?,
1790+
options: SortOptions::default(),
1791+
}]
1792+
.into();
1793+
1794+
// Create two sorted single-partition inputs, then union them to get
1795+
// a sorted input with 2 partitions.
1796+
let source1 = SortExec::new(
1797+
sort_exprs.clone(),
1798+
Arc::new(EmptyExec::new(Arc::clone(&schema))),
1799+
);
1800+
let source2 = SortExec::new(sort_exprs, Arc::new(EmptyExec::new(schema)));
1801+
let union = UnionExec::try_new(vec![
1802+
Arc::new(source1) as Arc<dyn ExecutionPlan>,
1803+
Arc::new(source2) as Arc<dyn ExecutionPlan>,
1804+
])?;
1805+
1806+
let repartition = RepartitionExec::try_new(union, Partitioning::RoundRobinBatch(10))?
1807+
.with_preserve_order();
1808+
assert!(repartition.preserve_order());
1809+
1810+
roundtrip_test(Arc::new(repartition))
1811+
}
1812+
17841813
#[test]
17851814
fn roundtrip_interleave() -> Result<()> {
17861815
let field_a = Field::new("col", DataType::Int64, false);

0 commit comments

Comments
 (0)