Skip to content

Commit b35fd03

Browse files
committed
Fix group join optimizer CI coverage
1 parent 5fe7219 commit b35fd03

9 files changed

Lines changed: 628 additions & 85 deletions

File tree

datafusion/core/src/optimizer_rule_reference.md

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -76,18 +76,19 @@ in multiple phases.
7676
| 5 | `FilterPushdown` | pre-optimization phase | Pushes supported physical filters down toward data sources before distribution and sorting are enforced. |
7777
| 6 | `EnforceDistribution` | - | Adds repartitioning only where needed to satisfy physical distribution requirements. |
7878
| 7 | `CombinePartialFinalAggregate` | - | Collapses adjacent partial and final aggregates when the distributed shape makes them redundant. |
79-
| 8 | `EnforceSorting` | - | Adds or removes local sorts to satisfy required input orderings. |
80-
| 9 | `OptimizeAggregateOrder` | - | Updates aggregate expressions to use the best ordering once sort requirements are known. |
81-
| 10 | `WindowTopN` | - | Replaces eligible row-number window and filter patterns with per-partition TopK execution. |
82-
| 11 | `ProjectionPushdown` | early pass | Pushes projections toward inputs before later physical rewrites add more limit and TopK structure. |
83-
| 12 | `OutputRequirements` | remove phase | Removes the temporary output-requirement helper nodes after requirement-sensitive planning is done. |
84-
| 13 | `LimitAggregation` | - | Passes a limit hint into eligible aggregations so they can keep fewer accumulator buckets. |
85-
| 14 | `LimitPushPastWindows` | - | Pushes fetch limits through bounded window operators when doing so keeps the result correct. |
86-
| 15 | `HashJoinBuffering` | - | Adds buffering on the probe side of hash joins so probing can start before build completion. |
87-
| 16 | `LimitPushdown` | - | Moves physical limits into child operators or fetch-enabled variants to cut data early. |
88-
| 17 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
89-
| 18 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
90-
| 19 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
91-
| 20 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
92-
| 21 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
93-
| 22 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |
79+
| 8 | `group_join` | - | Fuses eligible aggregate-over-hash-join plans when grouping keys match join keys. |
80+
| 9 | `EnforceSorting` | - | Adds or removes local sorts to satisfy required input orderings. |
81+
| 10 | `OptimizeAggregateOrder` | - | Updates aggregate expressions to use the best ordering once sort requirements are known. |
82+
| 11 | `WindowTopN` | - | Replaces eligible row-number window and filter patterns with per-partition TopK execution. |
83+
| 12 | `ProjectionPushdown` | early pass | Pushes projections toward inputs before later physical rewrites add more limit and TopK structure. |
84+
| 13 | `OutputRequirements` | remove phase | Removes the temporary output-requirement helper nodes after requirement-sensitive planning is done. |
85+
| 14 | `LimitAggregation` | - | Passes a limit hint into eligible aggregations so they can keep fewer accumulator buckets. |
86+
| 15 | `LimitPushPastWindows` | - | Pushes fetch limits through bounded window operators when doing so keeps the result correct. |
87+
| 16 | `HashJoinBuffering` | - | Adds buffering on the probe side of hash joins so probing can start before build completion. |
88+
| 17 | `LimitPushdown` | - | Moves physical limits into child operators or fetch-enabled variants to cut data early. |
89+
| 18 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
90+
| 19 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
91+
| 20 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
92+
| 21 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
93+
| 22 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
94+
| 23 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |

datafusion/physical-optimizer/src/group_join.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,14 @@ impl PhysicalOptimizerRule for GroupJoinOptimizer {
170170
.map(|(expr, name)| (Arc::clone(expr), name.clone()))
171171
.collect();
172172

173-
let group_join = GroupJoinExec::try_new(
173+
let group_join = GroupJoinExec::try_new_with_aggr_input_schema(
174174
Arc::clone(hash_join.left()),
175175
Arc::clone(hash_join.right()),
176176
join_on.to_vec(),
177177
*hash_join.join_type(),
178178
group_by_with_names,
179179
aggr_exprs.to_vec(),
180+
agg_exec.input_schema(),
180181
)?;
181182

182183
Ok(Transformed::yes(

datafusion/physical-plan/src/joins/group_join.rs

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ pub struct GroupJoinExec {
8989
group_by_exprs: Vec<(PhysicalExprRef, String)>,
9090
/// Aggregate function expressions (e.g., COUNT, SUM)
9191
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
92+
/// Input schema used to build aggregate expressions
93+
aggr_input_schema: SchemaRef,
9294
/// Output schema: group-by columns + aggregate outputs
9395
schema: SchemaRef,
9496
/// Execution metrics
@@ -108,6 +110,29 @@ impl GroupJoinExec {
108110
join_type: JoinType,
109111
group_by_exprs: Vec<(PhysicalExprRef, String)>,
110112
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
113+
) -> Result<Self> {
114+
let aggr_input_schema = right.schema();
115+
Self::try_new_with_aggr_input_schema(
116+
left,
117+
right,
118+
on,
119+
join_type,
120+
group_by_exprs,
121+
aggr_expr,
122+
aggr_input_schema,
123+
)
124+
}
125+
126+
/// Create a new `GroupJoinExec` with the schema used to build aggregate
127+
/// expressions.
128+
pub fn try_new_with_aggr_input_schema(
129+
left: Arc<dyn ExecutionPlan>,
130+
right: Arc<dyn ExecutionPlan>,
131+
on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
132+
join_type: JoinType,
133+
group_by_exprs: Vec<(PhysicalExprRef, String)>,
134+
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
135+
aggr_input_schema: SchemaRef,
111136
) -> Result<Self> {
112137
if !matches!(join_type, JoinType::Inner | JoinType::Left) {
113138
return internal_err!(
@@ -146,11 +171,47 @@ impl GroupJoinExec {
146171
join_type,
147172
group_by_exprs,
148173
aggr_expr,
174+
aggr_input_schema,
149175
schema,
150176
metrics: ExecutionPlanMetricsSet::new(),
151177
cache,
152178
})
153179
}
180+
181+
/// Build side input.
182+
pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
183+
&self.left
184+
}
185+
186+
/// Probe side input.
187+
pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
188+
&self.right
189+
}
190+
191+
/// Equi-join key expressions.
192+
pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
193+
&self.on
194+
}
195+
196+
/// Join type.
197+
pub fn join_type(&self) -> &JoinType {
198+
&self.join_type
199+
}
200+
201+
/// GROUP BY expressions with output aliases.
202+
pub fn group_by_exprs(&self) -> &[(PhysicalExprRef, String)] {
203+
&self.group_by_exprs
204+
}
205+
206+
/// Aggregate expressions.
207+
pub fn aggr_expr(&self) -> &[Arc<AggregateFunctionExpr>] {
208+
&self.aggr_expr
209+
}
210+
211+
/// Input schema used to build aggregate expressions.
212+
pub fn aggr_input_schema(&self) -> &SchemaRef {
213+
&self.aggr_input_schema
214+
}
154215
}
155216

156217
impl DisplayAs for GroupJoinExec {
@@ -200,16 +261,16 @@ impl ExecutionPlan for GroupJoinExec {
200261
self: Arc<Self>,
201262
children: Vec<Arc<dyn ExecutionPlan>>,
202263
) -> Result<Arc<dyn ExecutionPlan>> {
203-
Ok(Arc::new(GroupJoinExec::try_new(
264+
Ok(Arc::new(GroupJoinExec::try_new_with_aggr_input_schema(
204265
Arc::clone(&children[0]),
205266
Arc::clone(&children[1]),
206267
self.on.clone(),
207268
self.join_type,
208269
self.group_by_exprs.clone(),
209270
self.aggr_expr.clone(),
271+
Arc::clone(&self.aggr_input_schema),
210272
)?))
211273
}
212-
213274
fn required_input_distribution(&self) -> Vec<Distribution> {
214275
let left_exprs: Vec<PhysicalExprRef> =
215276
self.on.iter().map(|(l, _)| Arc::clone(l)).collect();

datafusion/proto/proto/datafusion.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,7 @@ message PhysicalPlanNode {
788788
BufferExecNode buffer = 37;
789789
ArrowScanExecNode arrow_scan = 38;
790790
ScalarSubqueryExecNode scalar_subquery = 39;
791+
GroupJoinExecNode group_join = 40;
791792
}
792793
}
793794

@@ -1189,6 +1190,18 @@ message HashJoinExecNode {
11891190
bool null_aware = 10;
11901191
}
11911192

1193+
message GroupJoinExecNode {
1194+
PhysicalPlanNode left = 1;
1195+
PhysicalPlanNode right = 2;
1196+
repeated JoinOn on = 3;
1197+
datafusion_common.JoinType join_type = 4;
1198+
repeated PhysicalExprNode group_expr = 5;
1199+
repeated string group_expr_name = 6;
1200+
repeated PhysicalExprNode aggr_expr = 7;
1201+
repeated string aggr_expr_name = 8;
1202+
datafusion_common.Schema input_schema = 9;
1203+
}
1204+
11921205
enum StreamPartitionMode {
11931206
SINGLE_PARTITION = 0;
11941207
PARTITIONED_EXEC = 1;

0 commit comments

Comments
 (0)