Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/compute/src/render/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,13 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
VecCollection<'scope, T, DataflowErrorSer, Diff>,
) {
mfp.optimize();
let mfp_plan = mfp.clone().into_plan().unwrap();
// NOTE: only an un-extractable temporal predicate makes `into_plan` fail;
// `ExprPrepMaintained` rejects those at plan time and peeks materialize
// `mz_now()`, so none reach render.
let mfp_plan = mfp
.clone()
.into_plan()
.expect("temporal predicate extractable");

// If the MFP is trivial, we can just call `as_collection`.
// In the case that we weren't going to apply the `key_val` optimization,
Expand All @@ -959,7 +965,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
let max_demand = mfp.demand().last().map(|x| *x + 1).unwrap_or(0);
mfp.permute_fn(|c| c, max_demand);
mfp.optimize();
let mfp_plan = mfp.into_plan().unwrap();
let mfp_plan = mfp.into_plan().expect("temporal predicate extractable");

let mut datum_vec = DatumVec::new();
// Wrap in an `Rc` so that lifetimes work out.
Expand Down
175 changes: 175 additions & 0 deletions src/expr/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,59 @@ impl MirScalarExpr {
}
}

/// Whether [`Self::undistribute_and_or`] can rewrite `self` without changing
/// its error semantics.
///
/// Undistribution recombines the operands *not* common to every disjunct into
/// a new AND/OR, changing the short-circuit context they evaluate in. Only a
/// dominating operand (`false`/`true`), not a NULL one, absorbs another
/// operand's error, so recombining a could-error operand can change whether a
/// row errors (e.g. `a OR (a AND <err>)` raises for a NULL `a`, but
/// undistributes to `a`).
///
/// Temporal predicates are exempt (see the `contains_temporal` check): a
/// shared `mz_now() <op> t` must stay liftable out of the OR or the dataflow
/// can't be planned, and `mz_now()` is unmaterializable so there is no runtime
/// error to preserve.
fn undistribution_preserves_errors(&self) -> bool {
let MirScalarExpr::CallVariadic {
func: func @ (VariadicFunc::And(_) | VariadicFunc::Or(_)),
exprs,
} = self
else {
return true;
};
// A shared temporal predicate (`mz_now() <op> t`) must stay liftable out
// of the OR, else the renderer can't extract its time bound and the view
// fails to plan (and an existing one fails to re-optimize on upgrade,
// halting environmentd). `mz_now()` is unmaterializable, so it carries no
// runtime value and there is no error semantics to preserve here anyway.
if self.contains_temporal() {
return true;
}
// Fast path for the common case: with no erroring operand there is
// nothing to preserve, so skip the per-operand bookkeeping below.
// `could_error` recurses, so this also covers nested operands.
if !exprs.iter().any(|o| o.could_error()) {
return true;
}
let inner = func.switch_and_or();
// The operands of a disjunct (a non-`inner` disjunct is its own singleton).
let operands = |o: &MirScalarExpr| match o {
MirScalarExpr::CallVariadic { func, exprs } if *func == inner => exprs.clone(),
_ => vec![o.clone()],
};
let common = exprs
.iter()
.map(|o| BTreeSet::from_iter(operands(o)))
.reduce(|a, b| &a & &b)
.unwrap_or_default();
exprs
.iter()
.flat_map(operands)
.all(|op| !op.could_error() || common.contains(&op))
}

/// AND/OR undistribution (factoring out) to apply at each `MirScalarExpr`.
///
/// This method attempts to apply one of the [distribution laws][distributivity]
Expand Down Expand Up @@ -922,6 +975,11 @@ impl MirScalarExpr {
while old_self != *self {
old_self = self.clone();
self.reduce_and_canonicalize_and_or(); // We don't want to deal with 1-arg AND/OR at the top

if !self.undistribution_preserves_errors() {
return;
}

if let MirScalarExpr::CallVariadic {
exprs: outer_operands,
func: outer_func @ (VariadicFunc::Or(_) | VariadicFunc::And(_)),
Expand Down Expand Up @@ -2475,6 +2533,123 @@ mod tests {
use super::*;
use crate::scalar::func::variadic::Coalesce;

#[mz_ore::test]
fn test_reduce_and_or_preserves_operand_errors() {
// #37049: AND/OR are non-strict, so a `false`/`true` operand absorbs an
// erroring operand at runtime. `reduce` must not fold or absorb an
// erroring operand into an error the original never raises.
let bool_typ = ReprScalarType::Bool;
let types = vec![bool_typ.clone().nullable(true)];
let err = || MirScalarExpr::literal(Err(EvalError::DivisionByZero), bool_typ.clone());
let col = || MirScalarExpr::column(0);
let arena = RowArena::new();

// `x AND <err>` must not fold to the error: `false AND <err>` is `false`.
let mut and = col().and(err());
and.reduce(&types);
assert!(!and.is_literal_err(), "{and:?}");
assert_eq!(and.eval(&[Datum::False], &arena), Ok(Datum::False));

// `a OR (a AND <err>)` must not absorb to `a`: a NULL `a` surfaces the
// error (`NULL OR (NULL AND <err>)` = `<err>`).
let mut or = col().or(col().and(err()));
or.reduce(&types);
assert_ne!(or, col(), "{or:?}");
assert_eq!(or.eval(&[Datum::True], &arena), Ok(Datum::True));
assert_eq!(or.eval(&[Datum::False], &arena), Ok(Datum::False));
assert_eq!(
or.eval(&[Datum::Null], &arena),
Err(EvalError::DivisionByZero)
);
}

#[mz_ore::test]
fn test_reduce_and_or_undistributes_common_error() {
// CLU-137: undistribution must still factor out a could-error operand
// common to every disjunct (e.g. a temporal `mz_now() < <expr>`), so it
// becomes a standalone conjunct the renderer can extract. The reverted
// #37049 guard skipped undistribution for any could-error expression,
// burying such predicates inside the OR.
let bool_typ = ReprScalarType::Bool;
let types = vec![
bool_typ.clone().nullable(true),
bool_typ.clone().nullable(true),
];
let err = || MirScalarExpr::literal(Err(EvalError::DivisionByZero), bool_typ.clone());
let arena = RowArena::new();

// `(a AND <err>) OR (b AND <err>)` --> `<err> AND (a OR b)`.
let original = MirScalarExpr::column(0)
.and(err())
.or(MirScalarExpr::column(1).and(err()));
let mut reduced = original.clone();
reduced.reduce(&types);
assert!(
matches!(
&reduced,
MirScalarExpr::CallVariadic {
func: VariadicFunc::And(_),
..
}
),
"common error operand not factored out: {reduced:?}"
);
// ...and error semantics are preserved over every assignment.
for a in [Datum::True, Datum::False, Datum::Null] {
for b in [Datum::True, Datum::False, Datum::Null] {
assert_eq!(
reduced.eval(&[a, b], &arena),
original.eval(&[a, b], &arena)
);
}
}
}

#[mz_ore::test]
fn test_reduce_and_or_undistributes_temporal_despite_noncommon_error() {
// CLU-137: a temporal predicate `T = mz_now() < t` shared by every
// disjunct must still be factored out even when a disjunct also carries a
// *non-common* operand that could error (here `(#3 / #4) > 0`). Factoring
// T out is not error-preserving in that case, but the error-preservation
// guard exempts temporal predicates: T must stay extractable or the view
// can't be planned (and an existing one fails to re-optimize on upgrade).
let types = vec![
ReprScalarType::Bool.nullable(true), // #0
ReprScalarType::Bool.nullable(true), // #1
ReprScalarType::MzTimestamp.nullable(true), // #2
ReprScalarType::Int32.nullable(true), // #3
ReprScalarType::Int32.nullable(true), // #4
];
let col = MirScalarExpr::column;
let temporal = || {
MirScalarExpr::CallUnmaterializable(UnmaterializableFunc::MzNow)
.call_binary(col(2), func::Lt)
};
let noncommon_err = || {
col(3).call_binary(col(4), func::DivInt32).call_binary(
MirScalarExpr::literal_ok(Datum::Int32(0), ReprScalarType::Int32),
func::Gt,
)
};

// `(#0 AND T) OR (#1 AND T AND E)`: T common, E non-common, both could_error.
let mut reduced = col(0)
.and(temporal())
.or(col(1).and(temporal()).and(noncommon_err()));
reduced.reduce(&types);

// T is lifted to a top-level AND conjunct, so the renderer can extract it.
let factored_out = matches!(
&reduced,
MirScalarExpr::CallVariadic { func: VariadicFunc::And(_), exprs }
if exprs.contains(&temporal())
);
assert!(
factored_out,
"temporal predicate not factored out: {reduced:?}"
);
}

#[mz_ore::test]
#[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
fn test_reduce() {
Expand Down
19 changes: 16 additions & 3 deletions src/expr/src/scalar/reduce/variadic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,22 @@ pub(super) fn reduce_call_variadic(
*e = MirScalarExpr::literal_null(e.typ(column_types).scalar_type);
return;
}
if let Some(err) = exprs.iter().find_map(|x| x.as_literal_err()) {
*e = MirScalarExpr::literal(Err(err.clone()), e.typ(column_types).scalar_type);
return;
// Fold the call to an operand's literal error only when the function is
// strict in errors: any operand error must make the whole call error. The
// non-strict variadics are excluded, because they can absorb an operand's
// error at runtime: AND/OR via a dominating `false`/`true` operand
// (`false AND <error>` is `false`), and ErrorIfNull because it evaluates its
// message argument only when the first argument is null. Every other variadic
// here evaluates all of its operands, so the fold is valid. (Coalesce, also
// non-strict, bailed out above.)
if !matches!(
func,
VariadicFunc::And(_) | VariadicFunc::Or(_) | VariadicFunc::ErrorIfNull(_)
) {
if let Some(err) = exprs.iter().find_map(|x| x.as_literal_err()) {
*e = MirScalarExpr::literal(Err(err.clone()), e.typ(column_types).scalar_type);
return;
}
}

// Per-function dispatch. Arms are mutually exclusive on discriminant; the
Expand Down
13 changes: 13 additions & 0 deletions test/sqllogictest/error_semantics.slt
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,16 @@ select coalesce(a, (select a/b from test)) from test;

query error Evaluation error: division by zero
select *, case when a = 5 then (select a/b from test) else 7 end from test;

# Regression for #37049. AND/OR are non-strict: a `false`/`true` operand absorbs
# an erroring operand at runtime (`false AND <error>` is `false`), so `reduce`
# must not fold `x AND <error>` / `x OR <error>` to the error. Before the fix
# these spuriously errored with division by zero (test.a = 1).
query I
select a from test where (a = 2) and (1/0 = 1);
----

query I
select a from test where (a = 1) or (1/0 = 1);
----
1
73 changes: 73 additions & 0 deletions test/sqllogictest/temporal.slt
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,76 @@ FROM (SELECT CAST('62143-12-31' AS date) + '200000 YEARS' AS c2
FROM (SELECT
FROM (SELECT FROM t) AS subq_0) AS subq_1
WHERE pg_catalog."current_timestamp"() = pg_catalog.pg_postmaster_start_time()) AS subq_2;

# Regression test for https://linear.app/materializeinc/issue/CLU-137
#
# A temporal predicate (`mz_now() < ...`) shared across both branches of a
# disjunction of conjunctions must be factored out of the disjunction, so that
# it becomes a standalone conjunct the renderer can extract as a temporal
# bound. The factoring is done by `undistribute_and_or` during predicate
# canonicalization. The error-propagation fix in #37049 had skipped
# undistribution for any expression that could error, and `mz_now()` always
# counts as could-error, so the temporal predicate stayed buried inside the OR
# and the renderer panicked with "Unsupported temporal predicate".

# A constant collection (`VALUES`) is readable at any logical time, so the MV
# can be queried `AS OF` a small timestamp, like the `valid` MV above.
statement ok
CREATE VIEW clu137 (active, confirmed, enabled, label, valid_until) AS VALUES
(true, true, true, CAST(NULL AS text), 10),
(true, true, true, '', 20),
(true, true, true, 'x', 30),
(false, true, true, NULL, 40);

statement ok
CREATE MATERIALIZED VIEW clu137_mv AS
SELECT valid_until FROM clu137
WHERE (active AND confirmed AND enabled AND label IS NULL AND mz_now() < valid_until)
OR (active AND confirmed AND enabled AND label = '' AND mz_now() < valid_until);

query I rowsort
SELECT * FROM clu137_mv AS OF 5;
----
10
20

query I rowsort
SELECT * FROM clu137_mv AS OF 15;
----
20

query I rowsort
SELECT * FROM clu137_mv AS OF 25;
----

# A temporal predicate shared by every disjunct must stay extractable even when
# a disjunct also has a non-common operand that could error (here
# `(100 / divisor) > 0`). `undistribute_and_or` exempts temporal predicates from
# its error-preservation guard, so `mz_now() < valid_until` is still factored out
# of the OR and the view builds. Without the exemption it would fail to plan,
# which would also break re-optimization of an existing such view on upgrade.
statement ok
CREATE VIEW clu137_fallible (active, label, divisor, valid_until) AS VALUES
(true, CAST(NULL AS text), 5, 10),
(true, '', 5, 20);

statement ok
CREATE MATERIALIZED VIEW clu137_fallible_mv AS
SELECT valid_until FROM clu137_fallible
WHERE (active AND label IS NULL AND mz_now() < valid_until)
OR (active AND label = '' AND mz_now() < valid_until AND (100 / divisor) > 0);

query I rowsort
SELECT * FROM clu137_fallible_mv AS OF 5;
----
10
20

query I rowsort
SELECT * FROM clu137_fallible_mv AS OF 15;
----
20

query I rowsort
SELECT * FROM clu137_fallible_mv AS OF 25;
----
Loading