Skip to content

Commit 1f51403

Browse files
timsaucerclaude
andcommitted
feat: user-defined OptimizerRule and AnalyzerRule from Python
Expose `SessionContext.add_optimizer_rule` and `SessionContext.add_analyzer_rule` symmetric with the existing `remove_optimizer_rule`. Each accepts a Python subclass of the new `datafusion.optimizer.OptimizerRule` / `AnalyzerRule` ABCs. Implementation: * New `crates/core/src/optimizer_rules.rs` wraps user Python instances in `PyOptimizerRuleAdapter` / `PyAnalyzerRuleAdapter`, which implement the upstream `OptimizerRule` / `AnalyzerRule` traits. * `OptimizerRule.rewrite(plan)` returns `None` for "no change" or a new `LogicalPlan`. The adapter maps that to `Transformed::no` / `Transformed::yes` so the upstream optimizer's fixed-point loop terminates correctly. * `AnalyzerRule.analyze(plan)` must always return a `LogicalPlan`; returning `None` surfaces a `DataFusionError::Execution` naming the offending rule. * The upstream `&dyn OptimizerConfig` / `&ConfigOptions` arguments are not surfaced to Python in this MVP; rules that need configuration should capture it at construction time (for example by holding a `SessionContext` reference) or be implemented in Rust. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent dac9ec6 commit 1f51403

6 files changed

Lines changed: 483 additions & 0 deletions

File tree

crates/core/src/context.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,6 +1145,18 @@ impl PySessionContext {
11451145
self.ctx.remove_optimizer_rule(name)
11461146
}
11471147

1148+
pub fn add_optimizer_rule(&self, rule: Bound<'_, PyAny>) -> PyResult<()> {
1149+
let adapter = crate::optimizer_rules::build_optimizer_rule(rule)?;
1150+
self.ctx.add_optimizer_rule(adapter);
1151+
Ok(())
1152+
}
1153+
1154+
pub fn add_analyzer_rule(&self, rule: Bound<'_, PyAny>) -> PyResult<()> {
1155+
let adapter = crate::optimizer_rules::build_analyzer_rule(rule)?;
1156+
self.ctx.add_analyzer_rule(adapter);
1157+
Ok(())
1158+
}
1159+
11481160
pub fn table_provider(&self, name: &str, py: Python) -> PyResult<PyTable> {
11491161
let provider = wait_for_future(py, self.ctx.table_provider(name))
11501162
// Outer error: runtime/async failure

crates/core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub mod expr;
4545
#[allow(clippy::borrow_deref_ref)]
4646
mod functions;
4747
pub mod metrics;
48+
pub mod optimizer_rules;
4849
mod options;
4950
pub mod physical_plan;
5051
mod pyarrow_filter_expression;

crates/core/src/optimizer_rules.rs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Bridges between user-provided Python rule classes and the upstream
19+
//! [`OptimizerRule`] / [`AnalyzerRule`] traits.
20+
//!
21+
//! The Python side defines abstract base classes ``OptimizerRule`` and
22+
//! ``AnalyzerRule`` with ``name()`` plus, respectively, ``rewrite(plan)``
23+
//! and ``analyze(plan)``. Instances are wrapped in
24+
//! [`PyOptimizerRuleAdapter`] / [`PyAnalyzerRuleAdapter`] before being
25+
//! handed to [`SessionContext::add_optimizer_rule`] /
26+
//! [`SessionContext::add_analyzer_rule`].
27+
//!
28+
//! `rewrite` may return ``None`` to signal "no transformation" — the
29+
//! adapter maps that to [`Transformed::no`]. Any returned
30+
//! :class:`LogicalPlan` becomes [`Transformed::yes`]. `analyze` is
31+
//! mandatory-rewrite (must return a plan); returning ``None`` is an
32+
//! error.
33+
//!
34+
//! The upstream ``&dyn OptimizerConfig`` / ``&ConfigOptions`` arguments
35+
//! are not surfaced to Python in this MVP. Rules that need configuration
36+
//! access should be implemented in Rust today; Python rules read state
37+
//! from the plan and from any captured ``SessionContext`` they were
38+
//! constructed with.
39+
40+
use std::fmt;
41+
use std::sync::Arc;
42+
43+
use datafusion::common::config::ConfigOptions;
44+
use datafusion::common::tree_node::Transformed;
45+
use datafusion::error::{DataFusionError, Result as DataFusionResult};
46+
use datafusion::logical_expr::LogicalPlan;
47+
use datafusion::optimizer::analyzer::AnalyzerRule;
48+
use datafusion::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
49+
use pyo3::prelude::*;
50+
51+
use crate::errors::to_datafusion_err;
52+
use crate::sql::logical::PyLogicalPlan;
53+
54+
/// Wraps a Python ``OptimizerRule`` instance so that it can be registered
55+
/// with the upstream optimizer pipeline.
56+
pub struct PyOptimizerRuleAdapter {
57+
rule: Py<PyAny>,
58+
name: String,
59+
}
60+
61+
impl PyOptimizerRuleAdapter {
62+
pub fn new(rule: Bound<'_, PyAny>) -> PyResult<Self> {
63+
let name = rule.call_method0("name")?.extract::<String>()?;
64+
Ok(Self {
65+
rule: rule.unbind(),
66+
name,
67+
})
68+
}
69+
}
70+
71+
impl fmt::Debug for PyOptimizerRuleAdapter {
72+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73+
f.debug_struct("PyOptimizerRuleAdapter")
74+
.field("name", &self.name)
75+
.finish()
76+
}
77+
}
78+
79+
impl OptimizerRule for PyOptimizerRuleAdapter {
80+
fn name(&self) -> &str {
81+
&self.name
82+
}
83+
84+
fn rewrite(
85+
&self,
86+
plan: LogicalPlan,
87+
_config: &dyn OptimizerConfig,
88+
) -> DataFusionResult<Transformed<LogicalPlan>> {
89+
Python::attach(|py| {
90+
let py_plan = PyLogicalPlan::from(plan.clone());
91+
let result = self
92+
.rule
93+
.bind(py)
94+
.call_method1("rewrite", (py_plan,))
95+
.map_err(to_datafusion_err)?;
96+
if result.is_none() {
97+
return Ok(Transformed::no(plan));
98+
}
99+
let rewritten: PyLogicalPlan = result.extract().map_err(to_datafusion_err)?;
100+
Ok(Transformed::yes(LogicalPlan::from(rewritten)))
101+
})
102+
}
103+
}
104+
105+
/// Wraps a Python ``AnalyzerRule`` instance so that it can be registered
106+
/// with the upstream analyzer pipeline.
107+
pub struct PyAnalyzerRuleAdapter {
108+
rule: Py<PyAny>,
109+
name: String,
110+
}
111+
112+
impl PyAnalyzerRuleAdapter {
113+
pub fn new(rule: Bound<'_, PyAny>) -> PyResult<Self> {
114+
let name = rule.call_method0("name")?.extract::<String>()?;
115+
Ok(Self {
116+
rule: rule.unbind(),
117+
name,
118+
})
119+
}
120+
}
121+
122+
impl fmt::Debug for PyAnalyzerRuleAdapter {
123+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124+
f.debug_struct("PyAnalyzerRuleAdapter")
125+
.field("name", &self.name)
126+
.finish()
127+
}
128+
}
129+
130+
impl AnalyzerRule for PyAnalyzerRuleAdapter {
131+
fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> DataFusionResult<LogicalPlan> {
132+
Python::attach(|py| {
133+
let py_plan = PyLogicalPlan::from(plan);
134+
let result = self
135+
.rule
136+
.bind(py)
137+
.call_method1("analyze", (py_plan,))
138+
.map_err(to_datafusion_err)?;
139+
if result.is_none() {
140+
return Err(DataFusionError::Execution(format!(
141+
"AnalyzerRule {} returned None from analyze(); analyzer rules \
142+
must return a LogicalPlan",
143+
self.name
144+
)));
145+
}
146+
let rewritten: PyLogicalPlan = result.extract().map_err(to_datafusion_err)?;
147+
Ok(LogicalPlan::from(rewritten))
148+
})
149+
}
150+
151+
fn name(&self) -> &str {
152+
&self.name
153+
}
154+
}
155+
156+
/// Construct an adapter from a Python ``OptimizerRule`` instance.
157+
pub(crate) fn build_optimizer_rule(
158+
rule: Bound<'_, PyAny>,
159+
) -> PyResult<Arc<dyn OptimizerRule + Send + Sync>> {
160+
Ok(Arc::new(PyOptimizerRuleAdapter::new(rule)?))
161+
}
162+
163+
/// Construct an adapter from a Python ``AnalyzerRule`` instance.
164+
pub(crate) fn build_analyzer_rule(
165+
rule: Bound<'_, PyAny>,
166+
) -> PyResult<Arc<dyn AnalyzerRule + Send + Sync>> {
167+
Ok(Arc::new(PyAnalyzerRuleAdapter::new(rule)?))
168+
}

python/datafusion/context.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
from datafusion.catalog import CatalogProvider, Table
9191
from datafusion.common import DFSchema
9292
from datafusion.expr import Expr, SortKey
93+
from datafusion.optimizer import AnalyzerRule, OptimizerRule
9394
from datafusion.plan import ExecutionPlan, LogicalPlan
9495
from datafusion.user_defined import (
9596
AggregateUDF,
@@ -1260,6 +1261,52 @@ def register_udwf(self, udwf: WindowUDF) -> None:
12601261
"""Register a user-defined window function (UDWF) with the context."""
12611262
self.ctx.register_udwf(udwf._udwf)
12621263

1264+
def add_optimizer_rule(self, rule: OptimizerRule) -> None:
1265+
"""Append a user-defined :class:`OptimizerRule` to the session.
1266+
1267+
The rule's :py:meth:`OptimizerRule.rewrite` method is invoked
1268+
during query planning. Returning ``None`` from ``rewrite``
1269+
signals no change; returning a new
1270+
:class:`~datafusion.plan.LogicalPlan` signals a rewrite.
1271+
1272+
Args:
1273+
rule: An instance of a class that implements
1274+
:class:`datafusion.optimizer.OptimizerRule`.
1275+
1276+
Examples:
1277+
>>> from datafusion.optimizer import OptimizerRule
1278+
>>> class NoopRule(OptimizerRule):
1279+
... def name(self) -> str: return "noop"
1280+
... def rewrite(self, plan): return None
1281+
>>> ctx = dfn.SessionContext()
1282+
>>> ctx.add_optimizer_rule(NoopRule())
1283+
>>> ctx.remove_optimizer_rule("noop")
1284+
True
1285+
"""
1286+
self.ctx.add_optimizer_rule(rule)
1287+
1288+
def add_analyzer_rule(self, rule: AnalyzerRule) -> None:
1289+
"""Append a user-defined :class:`AnalyzerRule` to the session.
1290+
1291+
The rule's :py:meth:`AnalyzerRule.analyze` method is invoked
1292+
during the analysis phase of query planning. Analyzer rules
1293+
must always return a :class:`~datafusion.plan.LogicalPlan`
1294+
(return the input plan unchanged when no rewrite applies).
1295+
1296+
Args:
1297+
rule: An instance of a class that implements
1298+
:class:`datafusion.optimizer.AnalyzerRule`.
1299+
1300+
Examples:
1301+
>>> from datafusion.optimizer import AnalyzerRule
1302+
>>> class Identity(AnalyzerRule):
1303+
... def name(self) -> str: return "identity"
1304+
... def analyze(self, plan): return plan
1305+
>>> ctx = dfn.SessionContext()
1306+
>>> ctx.add_analyzer_rule(Identity())
1307+
"""
1308+
self.ctx.add_analyzer_rule(rule)
1309+
12631310
def deregister_udwf(self, name: str) -> None:
12641311
"""Remove a user-defined window function from the session.
12651312

0 commit comments

Comments
 (0)