Skip to content

Commit fb1e7d4

Browse files
committed
fix(pipeline): async phases read max_passes from config instead of hardcoding 8
1 parent 067ba4b commit fb1e7d4

10 files changed

Lines changed: 1116 additions & 137 deletions

File tree

datafusion/core/src/execution/context/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -749,10 +749,9 @@ impl SessionContext {
749749
Arc::clone(self.state().config().options()),
750750
)
751751
.without_query_execution_start_time();
752-
let plan = self.state().optimizer().optimize(
752+
let plan = self.state().optimize_with_config(
753753
Arc::unwrap_or_clone(input),
754754
&optimizer_context,
755-
|_1, _2| {},
756755
)?;
757756
self.state
758757
.write()

datafusion/core/src/execution/session_state.rs

Lines changed: 214 additions & 132 deletions
Large diffs are not rendered by default.

datafusion/optimizer/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,13 @@ datafusion-physical-expr = { workspace = true }
6161
indexmap = { workspace = true }
6262
itertools = { workspace = true }
6363
log = { workspace = true }
64+
async-trait = { workspace = true }
6465
recursive = { workspace = true, optional = true }
6566
regex = { workspace = true }
6667
regex-syntax = "0.8.9"
6768

6869
[dev-dependencies]
69-
async-trait = { workspace = true }
70+
7071
criterion = { workspace = true }
7172
ctor = { workspace = true }
7273
datafusion-functions-aggregate = { workspace = true }

datafusion/optimizer/src/analyzer/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use std::fmt::Debug;
2121
use std::sync::Arc;
2222

23+
use async_trait::async_trait;
2324
use log::debug;
2425

2526
use datafusion_common::Result;
@@ -61,6 +62,27 @@ pub trait AnalyzerRule: Debug {
6162
fn name(&self) -> &str;
6263
}
6364

65+
/// Like [`AnalyzerRule`], but may perform async operations (e.g. remote catalog
66+
/// lookups, schema resolution) during analysis.
67+
///
68+
/// Async analyzer rules run as a pre-analysis phase in
69+
/// [`SessionState::create_physical_plan`], before the synchronous [`Analyzer`].
70+
/// By default no rules are registered, so the phase is a no-op.
71+
///
72+
/// [`SessionState::create_physical_plan`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.create_physical_plan
73+
#[async_trait]
74+
pub trait AsyncAnalyzerRule: Debug + Send + Sync {
75+
/// Rewrite `plan`, possibly performing async I/O.
76+
async fn analyze(
77+
&self,
78+
plan: LogicalPlan,
79+
config: &ConfigOptions,
80+
) -> Result<LogicalPlan>;
81+
82+
/// A human readable name for this rule.
83+
fn name(&self) -> &str;
84+
}
85+
6486
/// Rule-based Analyzer.
6587
///
6688
/// Applies [`FunctionRewrite`]s and [`AnalyzerRule`]s to transform a

datafusion/optimizer/src/lib.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,17 @@ pub mod utils;
7575
#[cfg(test)]
7676
pub mod test;
7777

78-
pub use analyzer::{Analyzer, AnalyzerRule};
78+
pub mod logical_pipeline;
79+
80+
pub use analyzer::{Analyzer, AnalyzerRule, AsyncAnalyzerRule};
81+
pub use logical_pipeline::{
82+
AsyncAnalysisPhase, AsyncOptimizationPhase, AsyncPhase, DEFAULT_ANALYSIS_PHASE,
83+
DEFAULT_OPTIMIZATION_PHASE, LogicalPlanningPipeline, Phase, Strategy,
84+
SyncAnalysisPhase, SyncOptimizationPhase, SyncPhase,
85+
};
7986
pub use optimizer::{
80-
ApplyOrder, Optimizer, OptimizerConfig, OptimizerContext, OptimizerRule,
87+
ApplyOrder, AsyncOptimizerRule, Optimizer, OptimizerConfig, OptimizerContext,
88+
OptimizerRule,
8189
};
8290

8391
pub(crate) mod join_key_set;
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::fmt::Debug;
19+
use std::sync::Arc;
20+
21+
use datafusion_common::Result;
22+
use datafusion_common::config::ConfigOptions;
23+
use datafusion_expr::LogicalPlan;
24+
25+
use crate::analyzer::AsyncAnalyzerRule;
26+
use crate::optimizer::AsyncOptimizerRule;
27+
28+
use super::Strategy;
29+
30+
/// A named, activatable phase whose rules are applied asynchronously.
31+
///
32+
/// Use [`AsyncAnalysisPhase`] for [`AsyncAnalyzerRule`]s and
33+
/// [`AsyncOptimizationPhase`] for [`AsyncOptimizerRule`]s.
34+
pub struct AsyncPhase<T: ?Sized> {
35+
pub(super) name: String,
36+
pub enabled: bool,
37+
pub strategy: Strategy,
38+
pub rules: Vec<Arc<T>>,
39+
}
40+
41+
impl<T: ?Sized> Clone for AsyncPhase<T> {
42+
fn clone(&self) -> Self {
43+
Self {
44+
name: self.name.clone(),
45+
enabled: self.enabled,
46+
strategy: self.strategy.clone(),
47+
rules: self.rules.clone(),
48+
}
49+
}
50+
}
51+
52+
impl<T: ?Sized + Debug> Debug for AsyncPhase<T> {
53+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54+
f.debug_struct("AsyncPhase")
55+
.field("name", &self.name)
56+
.field("enabled", &self.enabled)
57+
.field("strategy", &self.strategy)
58+
.field("rules", &self.rules)
59+
.finish()
60+
}
61+
}
62+
63+
impl<T: ?Sized> AsyncPhase<T> {
64+
pub fn new(name: impl Into<String>, strategy: Strategy) -> Self {
65+
Self {
66+
name: name.into(),
67+
enabled: true,
68+
strategy,
69+
rules: Vec::new(),
70+
}
71+
}
72+
73+
pub fn name(&self) -> &str {
74+
&self.name
75+
}
76+
}
77+
78+
impl AsyncPhase<dyn AsyncAnalyzerRule + Send + Sync> {
79+
pub async fn apply(
80+
&self,
81+
plan: LogicalPlan,
82+
config: &ConfigOptions,
83+
) -> Result<LogicalPlan> {
84+
if !self.enabled || self.rules.is_empty() {
85+
return Ok(plan);
86+
}
87+
let passes = match &self.strategy {
88+
Strategy::Once => 1,
89+
Strategy::FixedPoint { max_passes } => {
90+
max_passes.unwrap_or_else(|| config.optimizer.max_passes)
91+
}
92+
};
93+
let mut plan = plan;
94+
for _ in 0..passes {
95+
for rule in &self.rules {
96+
plan = rule.analyze(plan, config).await?;
97+
}
98+
}
99+
Ok(plan)
100+
}
101+
}
102+
103+
impl AsyncPhase<dyn AsyncOptimizerRule + Send + Sync> {
104+
pub async fn apply(
105+
&self,
106+
plan: LogicalPlan,
107+
config: &ConfigOptions,
108+
) -> Result<LogicalPlan> {
109+
if !self.enabled || self.rules.is_empty() {
110+
return Ok(plan);
111+
}
112+
let passes = match &self.strategy {
113+
Strategy::Once => 1,
114+
Strategy::FixedPoint { max_passes } => {
115+
max_passes.unwrap_or_else(|| config.optimizer.max_passes)
116+
}
117+
};
118+
let mut plan = plan;
119+
for _ in 0..passes {
120+
for rule in &self.rules {
121+
plan = rule.rewrite(plan, config).await?;
122+
}
123+
}
124+
Ok(plan)
125+
}
126+
}
127+
128+
pub type AsyncAnalysisPhase = AsyncPhase<dyn AsyncAnalyzerRule + Send + Sync>;
129+
pub type AsyncOptimizationPhase = AsyncPhase<dyn AsyncOptimizerRule + Send + Sync>;
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! [`LogicalPlanningPipeline`]: an ordered sequence of named [`Phase`]s that
19+
//! transform a [`LogicalPlan`] before physical planning.
20+
//!
21+
//! Inspired by Spark SQL's `Batch(name, strategy, rules)` model.
22+
23+
mod async_phase;
24+
mod pipeline;
25+
mod sync_phase;
26+
27+
pub use async_phase::{AsyncAnalysisPhase, AsyncOptimizationPhase, AsyncPhase};
28+
pub use pipeline::LogicalPlanningPipeline;
29+
pub use sync_phase::{SyncAnalysisPhase, SyncOptimizationPhase, SyncPhase};
30+
31+
/// Name of the default synchronous analysis phase.
32+
pub const DEFAULT_ANALYSIS_PHASE: &str = "analysis";
33+
34+
/// Name of the default synchronous optimization phase.
35+
pub const DEFAULT_OPTIMIZATION_PHASE: &str = "optimization";
36+
37+
/// How many times a phase's rules are applied.
38+
///
39+
/// # Choosing a strategy
40+
///
41+
/// Use [`Strategy::Once`] when rules are idempotent and a single pass always
42+
/// suffices (e.g. a post-processing cleanup that must run exactly one time).
43+
///
44+
/// Use [`Strategy::FixedPoint`] when rules interact: one rule's output may
45+
/// unlock another rule's pattern. The phase repeats until the plan signature
46+
/// stops changing or the pass limit is hit. This is the correct choice for
47+
/// both the default analysis and optimization phases.
48+
///
49+
/// Note: `FixedPoint { max_passes: Some(1) }` is mechanically equivalent to
50+
/// `Once` (both execute rules exactly one time), but carries different intent.
51+
/// `Once` means "one pass by design"; `FixedPoint { max_passes: Some(1) }`
52+
/// means "convergence loop, hard-capped at one iteration."
53+
#[derive(Clone, Debug)]
54+
pub enum Strategy {
55+
/// Apply each rule exactly once. No convergence check is performed.
56+
Once,
57+
/// Repeat until the plan stops changing or `max_passes` iterations are
58+
/// reached. Convergence is detected via [`LogicalPlanSignature`] so the
59+
/// loop exits early as soon as a full pass produces no change.
60+
///
61+
/// `max_passes: None` defers to `config.optimizer.max_passes` at runtime.
62+
///
63+
/// [`LogicalPlanSignature`]: crate::plan_signature::LogicalPlanSignature
64+
FixedPoint { max_passes: Option<usize> },
65+
}
66+
67+
/// One step in a [`LogicalPlanningPipeline`].
68+
#[derive(Clone, Debug)]
69+
pub enum Phase {
70+
SyncAnalysis(SyncAnalysisPhase),
71+
SyncOptimization(SyncOptimizationPhase),
72+
AsyncAnalysis(AsyncAnalysisPhase),
73+
AsyncOptimization(AsyncOptimizationPhase),
74+
}
75+
76+
impl Phase {
77+
pub fn name(&self) -> &str {
78+
match self {
79+
Phase::SyncAnalysis(p) => p.name(),
80+
Phase::SyncOptimization(p) => p.name(),
81+
Phase::AsyncAnalysis(p) => p.name(),
82+
Phase::AsyncOptimization(p) => p.name(),
83+
}
84+
}
85+
86+
pub fn is_async(&self) -> bool {
87+
matches!(self, Phase::AsyncAnalysis(_) | Phase::AsyncOptimization(_))
88+
}
89+
90+
pub fn is_enabled(&self) -> bool {
91+
match self {
92+
Phase::SyncAnalysis(p) => p.enabled,
93+
Phase::SyncOptimization(p) => p.enabled,
94+
Phase::AsyncAnalysis(p) => p.enabled,
95+
Phase::AsyncOptimization(p) => p.enabled,
96+
}
97+
}
98+
}

0 commit comments

Comments
 (0)