Skip to content

Commit 42fa31f

Browse files
committed
Skip distribution for plans with ScalarSubqueryExec
Physical scalar subqueries (apache/datafusion#21240) cannot be distributed yet because ScalarSubqueryExpr serialization requires context from the surrounding ScalarSubqueryExec.
1 parent dbf2e05 commit 42fa31f

2 files changed

Lines changed: 18 additions & 0 deletions

File tree

src/distributed_planner/distributed_physical_optimizer_rule.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use datafusion::config::ConfigOptions;
1111
use datafusion::error::DataFusionError;
1212
use datafusion::physical_optimizer::PhysicalOptimizerRule;
1313
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
14+
use datafusion::physical_plan::scalar_subquery::ScalarSubqueryExec;
1415
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
1516
use std::fmt::Debug;
1617
use std::ops::AddAssign;
@@ -39,6 +40,16 @@ use super::insert_broadcast::insert_broadcast_execs;
3940
#[derive(Debug, Default)]
4041
pub struct DistributedPhysicalOptimizerRule;
4142

43+
/// Checks if the plan tree contains a [ScalarSubqueryExec] node.
44+
fn contains_scalar_subquery_exec(plan: &Arc<dyn ExecutionPlan>) -> bool {
45+
if plan.downcast_ref::<ScalarSubqueryExec>().is_some() {
46+
return true;
47+
}
48+
plan.children()
49+
.iter()
50+
.any(|c| contains_scalar_subquery_exec(c))
51+
}
52+
4253
impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule {
4354
fn optimize(
4455
&self,
@@ -49,6 +60,11 @@ impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule {
4960
return Ok(original);
5061
}
5162

63+
// Skip distribution for plans with scalar subqueries; serialization not yet supported (https://github.com/apache/datafusion/pull/21240)
64+
if contains_scalar_subquery_exec(&original) {
65+
return Ok(original);
66+
}
67+
5268
let mut plan = Arc::clone(&original);
5369
if original.output_partitioning().partition_count() > 1 {
5470
plan = Arc::new(CoalescePartitionsExec::new(plan))

src/metrics/task_metrics_collector.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,8 @@ mod tests {
310310
.await;
311311
}
312312

313+
/// Skipped: scalar subquery distribution not yet supported (https://github.com/apache/datafusion/pull/21240)
314+
#[ignore]
313315
#[tokio::test]
314316
async fn test_metrics_collection_e2e_3() {
315317
run_metrics_collection_e2e_test(

0 commit comments

Comments
 (0)