Skip to content

Commit 8727044

Browse files
committed
fix(pipeline): restore optimizer rule error context and skip_failed_rules support
1 parent d2b97de commit 8727044

1 file changed

Lines changed: 104 additions & 41 deletions

File tree

datafusion/optimizer/src/logical_pipeline/sync_phase.rs

Lines changed: 104 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use datafusion_common::config::ConfigOptions;
2424
use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
2525
use datafusion_expr::expr_rewriter::FunctionRewrite;
2626
use datafusion_expr::{InvariantLevel, LogicalPlan, assert_expected_schema};
27+
use log::warn;
2728

2829
use crate::analyzer::AnalyzerRule;
2930
use crate::analyzer::function_rewrite::ApplyFunctionRewrites;
@@ -261,28 +262,58 @@ impl SyncPhase<dyn OptimizerRule + Send + Sync> {
261262

262263
'outer: for _ in 0..passes {
263264
for rule in &self.rules {
265+
let prev_plan = config
266+
.options()
267+
.optimizer
268+
.skip_failed_rules
269+
.then(|| plan.clone());
270+
264271
let rule_starting_schema = Arc::clone(plan.schema());
265-
let result: Transformed<LogicalPlan> = match rule.apply_order() {
266-
Some(order) => plan.rewrite_with_subqueries(
267-
&mut RuleRewriter::new(order, rule.as_ref(), config),
268-
)?,
269-
None => rule.rewrite(plan, config)?,
270-
};
271-
plan = result.data;
272-
assert_expected_schema(&rule_starting_schema, &plan).map_err(|e| {
273-
e.context(format!(
274-
"Optimizer rule '{}' changed the schema",
275-
rule.name()
276-
))
277-
})?;
278-
#[cfg(debug_assertions)]
279-
plan.check_invariants(InvariantLevel::Executable)
280-
.map_err(|e| {
281-
e.context(format!(
282-
"Invalid plan after optimizer rule '{}'",
272+
let result =
273+
match rule.apply_order() {
274+
Some(order) => plan.rewrite_with_subqueries(
275+
&mut RuleRewriter::new(order, rule.as_ref(), config),
276+
),
277+
None => rule.rewrite(plan, config),
278+
}
279+
.and_then(|t| {
280+
assert_expected_schema(&rule_starting_schema, &t.data).map_err(
281+
|e| {
282+
e.context(format!(
283+
"Optimizer rule '{}' changed the schema",
284+
rule.name()
285+
))
286+
},
287+
)?;
288+
#[cfg(debug_assertions)]
289+
t.data
290+
.check_invariants(InvariantLevel::Executable)
291+
.map_err(|e| {
292+
e.context(format!(
293+
"Invalid plan after optimizer rule '{}'",
294+
rule.name()
295+
))
296+
})?;
297+
Ok(t)
298+
});
299+
300+
match (result, prev_plan) {
301+
(Ok(t), _) => plan = t.data,
302+
(Err(e), Some(orig)) => {
303+
warn!(
304+
"Skipping optimizer rule '{}' due to unexpected error: {}",
305+
rule.name(),
306+
e
307+
);
308+
plan = orig;
309+
}
310+
(Err(e), None) => {
311+
return Err(e.context(format!(
312+
"Optimizer rule '{}' failed",
283313
rule.name()
284-
))
285-
})?;
314+
)));
315+
}
316+
}
286317
}
287318
let plan_is_fresh = previous_plans.insert(LogicalPlanSignature::new(&plan));
288319
if !plan_is_fresh {
@@ -324,29 +355,61 @@ impl SyncPhase<dyn OptimizerRule + Send + Sync> {
324355

325356
'outer: for _ in 0..passes {
326357
for rule in &self.rules {
358+
let prev_plan = config
359+
.options()
360+
.optimizer
361+
.skip_failed_rules
362+
.then(|| plan.clone());
363+
327364
let rule_starting_schema = Arc::clone(plan.schema());
328-
let result: Transformed<LogicalPlan> = match rule.apply_order() {
329-
Some(order) => plan.rewrite_with_subqueries(
330-
&mut RuleRewriter::new(order, rule.as_ref(), config),
331-
)?,
332-
None => rule.rewrite(plan, config)?,
333-
};
334-
plan = result.data;
335-
assert_expected_schema(&rule_starting_schema, &plan).map_err(|e| {
336-
e.context(format!(
337-
"Optimizer rule '{}' changed the schema",
338-
rule.name()
339-
))
340-
})?;
341-
#[cfg(debug_assertions)]
342-
plan.check_invariants(InvariantLevel::Executable)
343-
.map_err(|e| {
344-
e.context(format!(
345-
"Invalid plan after optimizer rule '{}'",
365+
let result =
366+
match rule.apply_order() {
367+
Some(order) => plan.rewrite_with_subqueries(
368+
&mut RuleRewriter::new(order, rule.as_ref(), config),
369+
),
370+
None => rule.rewrite(plan, config),
371+
}
372+
.and_then(|t| {
373+
assert_expected_schema(&rule_starting_schema, &t.data).map_err(
374+
|e| {
375+
e.context(format!(
376+
"Optimizer rule '{}' changed the schema",
377+
rule.name()
378+
))
379+
},
380+
)?;
381+
#[cfg(debug_assertions)]
382+
t.data
383+
.check_invariants(InvariantLevel::Executable)
384+
.map_err(|e| {
385+
e.context(format!(
386+
"Invalid plan after optimizer rule '{}'",
387+
rule.name()
388+
))
389+
})?;
390+
Ok(t)
391+
});
392+
393+
match (result, prev_plan) {
394+
(Ok(t), _) => {
395+
plan = t.data;
396+
observer(&plan, rule.name());
397+
}
398+
(Err(e), Some(orig)) => {
399+
warn!(
400+
"Skipping optimizer rule '{}' due to unexpected error: {}",
401+
rule.name(),
402+
e
403+
);
404+
plan = orig;
405+
}
406+
(Err(e), None) => {
407+
return Err(e.context(format!(
408+
"Optimizer rule '{}' failed",
346409
rule.name()
347-
))
348-
})?;
349-
observer(&plan, rule.name());
410+
)));
411+
}
412+
}
350413
}
351414
let plan_is_fresh = previous_plans.insert(LogicalPlanSignature::new(&plan));
352415
if !plan_is_fresh {

0 commit comments

Comments
 (0)