Skip to content

Commit 9f3e7dc

Browse files
Add Fetch Property to OutputRequirementExec (apache#16892)
* ore fetch * Update output_requirements.rs
1 parent 939d3c5 commit 9f3e7dc

3 files changed

Lines changed: 41 additions & 6 deletions

File tree

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -989,6 +989,7 @@ async fn test_soft_hard_requirements_with_multiple_soft_requirements_and_output_
989989
bounded_window2,
990990
Some(OrderingRequirements::new(requirement)),
991991
Distribution::SinglePartition,
992+
None,
992993
));
993994

994995
let expected_input = [

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,7 @@ fn test_output_req_after_projection() -> Result<()> {
712712
Arc::new(Column::new("a", 0)),
713713
Arc::new(Column::new("b", 1)),
714714
]),
715+
None,
715716
));
716717
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
717718
vec![

datafusion/physical-optimizer/src/output_requirements.rs

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use datafusion_common::{Result, Statistics};
3232
use datafusion_execution::TaskContext;
3333
use datafusion_physical_expr::Distribution;
3434
use datafusion_physical_expr_common::sort_expr::OrderingRequirements;
35+
use datafusion_physical_plan::execution_plan::Boundedness;
3536
use datafusion_physical_plan::projection::{
3637
make_with_child, update_expr, update_ordering_requirement, ProjectionExec,
3738
};
@@ -98,20 +99,23 @@ pub struct OutputRequirementExec {
9899
order_requirement: Option<OrderingRequirements>,
99100
dist_requirement: Distribution,
100101
cache: PlanProperties,
102+
fetch: Option<usize>,
101103
}
102104

103105
impl OutputRequirementExec {
104106
pub fn new(
105107
input: Arc<dyn ExecutionPlan>,
106108
requirements: Option<OrderingRequirements>,
107109
dist_requirement: Distribution,
110+
fetch: Option<usize>,
108111
) -> Self {
109-
let cache = Self::compute_properties(&input);
112+
let cache = Self::compute_properties(&input, &fetch);
110113
Self {
111114
input,
112115
order_requirement: requirements,
113116
dist_requirement,
114117
cache,
118+
fetch,
115119
}
116120
}
117121

@@ -120,14 +124,28 @@ impl OutputRequirementExec {
120124
}
121125

122126
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
123-
fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
127+
fn compute_properties(
128+
input: &Arc<dyn ExecutionPlan>,
129+
fetch: &Option<usize>,
130+
) -> PlanProperties {
131+
let boundedness = if fetch.is_some() {
132+
Boundedness::Bounded
133+
} else {
134+
input.boundedness()
135+
};
136+
124137
PlanProperties::new(
125138
input.equivalence_properties().clone(), // Equivalence Properties
126139
input.output_partitioning().clone(), // Output Partitioning
127140
input.pipeline_behavior(), // Pipeline Behavior
128-
input.boundedness(), // Boundedness
141+
boundedness, // Boundedness
129142
)
130143
}
144+
145+
/// Get fetch
146+
pub fn fetch(&self) -> Option<usize> {
147+
self.fetch
148+
}
131149
}
132150

133151
impl DisplayAs for OutputRequirementExec {
@@ -214,6 +232,7 @@ impl ExecutionPlan for OutputRequirementExec {
214232
children.remove(0), // has a single child
215233
self.order_requirement.clone(),
216234
self.dist_requirement.clone(),
235+
self.fetch,
217236
)))
218237
}
219238

@@ -273,10 +292,14 @@ impl ExecutionPlan for OutputRequirementExec {
273292
};
274293

275294
make_with_child(projection, &self.input()).map(|input| {
276-
let e = OutputRequirementExec::new(input, requirements, dist_req);
295+
let e = OutputRequirementExec::new(input, requirements, dist_req, self.fetch);
277296
Some(Arc::new(e) as _)
278297
})
279298
}
299+
300+
fn fetch(&self) -> Option<usize> {
301+
self.fetch
302+
}
280303
}
281304

282305
impl PhysicalOptimizerRule for OutputRequirements {
@@ -323,6 +346,7 @@ fn require_top_ordering(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn Executio
323346
// there is no ordering requirement
324347
None,
325348
Distribution::UnspecifiedDistribution,
349+
None,
326350
)) as _)
327351
}
328352
}
@@ -341,20 +365,29 @@ fn require_top_ordering_helper(
341365
// In case of constant columns, output ordering of the `SortExec` would
342366
// be an empty set. Therefore; we check the sort expression field to
343367
// assign the requirements.
344-
let req_ordering = sort_exec.expr();
345368
let req_dist = sort_exec.required_input_distribution().swap_remove(0);
369+
let req_ordering = sort_exec.expr();
346370
let reqs = OrderingRequirements::from(req_ordering.clone());
371+
let fetch = sort_exec.fetch();
372+
347373
Ok((
348-
Arc::new(OutputRequirementExec::new(plan, Some(reqs), req_dist)) as _,
374+
Arc::new(OutputRequirementExec::new(
375+
plan,
376+
Some(reqs),
377+
req_dist,
378+
fetch,
379+
)) as _,
349380
true,
350381
))
351382
} else if let Some(spm) = plan.as_any().downcast_ref::<SortPreservingMergeExec>() {
352383
let reqs = OrderingRequirements::from(spm.expr().clone());
384+
let fetch = spm.fetch();
353385
Ok((
354386
Arc::new(OutputRequirementExec::new(
355387
plan,
356388
Some(reqs),
357389
Distribution::SinglePartition,
390+
fetch,
358391
)) as _,
359392
true,
360393
))

0 commit comments

Comments
 (0)