diff --git a/src/compute-types/src/plan.rs b/src/compute-types/src/plan.rs index 987d42fec800c..4992f48c4faa4 100644 --- a/src/compute-types/src/plan.rs +++ b/src/compute-types/src/plan.rs @@ -490,36 +490,9 @@ impl Plan { Self::refine_union_negate_consolidation(&mut dataflow); } - if dataflow.is_single_time() { - Self::refine_single_time_operator_selection(&mut dataflow); - - // The relaxation of the `must_consolidate` flag performs an LIR-based - // analysis and transform under checked recursion. By a similar argument - // made in `from_mir`, we do not expect the recursion limit to be hit. - // However, if that happens, we propagate an error to the caller. - // To apply the transform, we first obtain monotonic source and index - // global IDs and add them to a `TransformConfig` instance. - let monotonic_ids = dataflow - .source_imports - .iter() - .filter_map(|(id, source_import)| source_import.monotonic.then_some(*id)) - .chain( - dataflow - .index_imports - .iter() - .filter_map(|(_id, index_import)| { - if index_import.monotonic { - Some(index_import.desc.on_id) - } else { - None - } - }), - ) - .collect::>(); + Self::refine_single_time_operator_selection(&mut dataflow); - let config = TransformConfig { monotonic_ids }; - Self::refine_single_time_consolidation(&mut dataflow, &config)?; - } + Self::refine_single_time_consolidation(&mut dataflow)?; soft_assert_eq_no_log!(dataflow.check_invariants(), Ok(())); @@ -639,18 +612,36 @@ impl Plan { mz_repr::explain::trace_plan(dataflow); } - /// Refines the plans of objects to be built as part of `dataflow` to take advantage - /// of monotonic operators if the dataflow refers to a single-time, i.e., is for a - /// one-shot SELECT query. + /// If the dataflow refers to a single-time, i.e., is for a one-shot SELECT query, this function + /// refines the `dataflow` to use monotonic operators. + /// + /// Note that we set the `must_consolidate` flag on the monotonic operators, i.e., we don't + /// assume physical monotonicity here. Reasoning about physical monotonicity and refining the + /// `must_consolidate` flag is the responsibility of `refine_single_time_consolidation`. + /// + /// Note that, strictly speaking, choosing monotonic operators is valid only by assuming that + /// - all inputs of single-time dataflows are logically (i.e., after consolidation) monotonic; + /// - it's not possible to introduce logically non-monotonic collections inside a single-time + /// dataflow when the inputs are logically monotonic. + /// + /// This assumption is not true when there are negative accumulations, which can be introduced + /// by bugs (in Materialize or external systems), or by user error when using `repeat_row`. + /// In such cases, the introduced monotonic operators won't produce correct results. This is + /// considered ok, because: + /// - In most cases, monotonic operators will detect non-monotonic input, and cleanly error out. + /// - In some cases, a monotonic operator might produce garbage output when given non-monotonic + /// input. Even this is considered ok, because we generally don't expect the system to always + /// detect negative accumulations (because this seems to be ~impossible to achieve without + /// adding further resource usage to various operators, not just monotonic operators). #[mz_ore::instrument( target = "optimizer", level = "debug", fields(path.segment = "refine_single_time_operator_selection") )] fn refine_single_time_operator_selection(dataflow: &mut DataflowDescription) { - // We should only reach here if we have a one-shot SELECT query, i.e., - // a single-time dataflow. - assert!(dataflow.is_single_time()); + if !dataflow.is_single_time() { + return; + } // Upgrade single-time plans to monotonic. for build_desc in dataflow.objects_to_build.iter_mut() { @@ -698,16 +689,41 @@ impl Plan { )] fn refine_single_time_consolidation( dataflow: &mut DataflowDescription, - config: &TransformConfig, ) -> Result<(), String> { - // We should only reach here if we have a one-shot SELECT query, i.e., - // a single-time dataflow. - assert!(dataflow.is_single_time()); + if !dataflow.is_single_time() { + return Ok(()); + } + + // The relaxation of the `must_consolidate` flag performs an LIR-based + // analysis and transform under checked recursion. By a similar argument + // made in `from_mir`, we do not expect the recursion limit to be hit. + // However, if that happens, we propagate an error to the caller. + // To apply the transform, we first obtain monotonic source and index + // global IDs and add them to a `TransformConfig` instance. + let monotonic_ids = dataflow + .source_imports + .iter() + .filter_map(|(id, source_import)| source_import.monotonic.then_some(*id)) + .chain( + dataflow + .index_imports + .iter() + .filter_map(|(_id, index_import)| { + if index_import.monotonic { + Some(index_import.desc.on_id) + } else { + None + } + }), + ) + .collect::>(); + + let config = TransformConfig { monotonic_ids }; let transform = transform::RelaxMustConsolidate; for build_desc in dataflow.objects_to_build.iter_mut() { transform - .transform(config, &mut build_desc.plan) + .transform(&config, &mut build_desc.plan) .map_err(|_| "Maximum recursion limit error in consolidation relaxation.")?; } mz_repr::explain::trace_plan(dataflow); diff --git a/src/compute-types/src/plan/interpret/physically_monotonic.rs b/src/compute-types/src/plan/interpret/physically_monotonic.rs index a48de2f6a7e57..e7cc1945d06dc 100644 --- a/src/compute-types/src/plan/interpret/physically_monotonic.rs +++ b/src/compute-types/src/plan/interpret/physically_monotonic.rs @@ -139,12 +139,10 @@ impl Interpreter for SingleTimeMonotonic<'_> { _input_key: &Option>, input: Self::Domain, _exprs: &Vec, - _func: &TableFunc, + func: &TableFunc, _mfp: &MapFilterProject, ) -> Self::Domain { - // In a single-time context, we just propagate the monotonicity - // status of the input - input + PhysicallyMonotonic(input.0 && func.preserves_monotonicity()) } fn join( diff --git a/test/sqllogictest/transform/relax_must_consolidate.slt b/test/sqllogictest/transform/lir_monotonicity.slt similarity index 83% rename from test/sqllogictest/transform/relax_must_consolidate.slt rename to test/sqllogictest/transform/lir_monotonicity.slt index 2540946a84c14..e1bac92bfb6fa 100644 --- a/test/sqllogictest/transform/relax_must_consolidate.slt +++ b/test/sqllogictest/transform/lir_monotonicity.slt @@ -7,11 +7,10 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. -# -# Test relaxation of the must_consolidate flag in LIR refinements -# for single-time dataflows (aka monotonic one-shot `SELECT`s). -# PR https://github.com/MaterializeInc/materialize/pull/19680 -# +# Tests LIR optimizations in single-time dataflows (one-shot SELECTs): +# - refine_single_time_consolidation -> RelaxMustConsolidate -> SingleTimeMonotonic -> PhysicallyMonotonic tweaking the +# `must_consolidate` flag on LIR operators. +# - refine_single_time_operator_selection tweaking the `monotonic` flag on LIR operators. statement ok CREATE TABLE t (a int, b int); @@ -1203,3 +1202,169 @@ Explained Query: Target cluster: quickstart EOF + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_repeat_row = true +---- +COMPLETE 0 + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_repeat_row_non_negative = true +---- +COMPLETE 0 + +statement ok +CREATE SOURCE mono_src FROM WEBHOOK BODY FORMAT TEXT; + +# Regression test for https://github.com/MaterializeInc/database-issues/issues/11310, i.e., that we set the +# `must_consolidate` flag when `repeat_row` is involved. +# Relies on RelaxMustConsolidate's SingleTimeMonotonic correctly handling FlatMap's TableFunc being !preserves_monotonicity. +# Note that the plans are still somewhat wrong, in that monotonic operators won't produce correct results when presented +# with negative accumulations even with the `must_consolidate` flag set. However, the expectation is that they'll have +# an easier time detecting negative accumulations and cleanly erroring out when the `must_consolidate` flag is set. +# See also https://github.com/MaterializeInc/materialize/pull/36184 for more discussion, where we considered not +# choosing monotonic operators when `repeat_row` is involved, but eventually decided to go with the current behavior. +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +SELECT MAX(body) FROM mono_src, repeat_row(length(body)); +---- +Explained Query: + With + cte l0 = + Reduce::Hierarchical + aggr_funcs=[max] + monotonic + must_consolidate + key_plan + project=() + val_plan=id + FlatMap repeat_row(integer_to_bigint(char_length(#0{body}))) + Get::PassArrangements materialize.public.mono_src + raw=true + Return + Union + ArrangeBy + input_key=[] + raw=true + Get::PassArrangements l0 + raw=false + arrangements[0]={ key=[], permutation=id, thinning=(#0) } + Mfp + project=(#0) + map=(null) + Union consolidate_output=true + Negate + Get::Arrangement l0 + project=() + key= + raw=false + arrangements[0]={ key=[], permutation=id, thinning=(#0) } + Constant + - () + +Source materialize.public.mono_src + +Target cluster: quickstart + +EOF + +# Relies on RelaxMustConsolidate's SingleTimeMonotonic correctly handling negative diffs in Constants. +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +SELECT MAX(body) FROM mono_src, repeat_row(-1); +---- +Explained Query: + With + cte l0 = + Reduce::Hierarchical + aggr_funcs=[max] + monotonic + must_consolidate + key_plan + project=() + val_plan=id + Join::Linear + linear_stage[0] + lookup={ relation=1, key=[] } + stream={ key=[], thinning=(#0) } + source={ relation=0, key=[] } + ArrangeBy + raw=true + arrangements[0]={ key=[], permutation=id, thinning=(#0) } + Get::PassArrangements materialize.public.mono_src + raw=true + ArrangeBy + raw=true + arrangements[0]={ key=[], permutation=id, thinning=() } + Constant + - (() x -1) + Return + Union + ArrangeBy + input_key=[] + raw=true + Get::PassArrangements l0 + raw=false + arrangements[0]={ key=[], permutation=id, thinning=(#0) } + Mfp + project=(#0) + map=(null) + Union consolidate_output=true + Negate + Get::Arrangement l0 + project=() + key= + raw=false + arrangements[0]={ key=[], permutation=id, thinning=(#0) } + Constant + - () + +Source materialize.public.mono_src + +Target cluster: quickstart + +EOF + +# Relies on `repeat_row_non_negative` being marked as `preserves_monotonicity` (as opposed to `repeat_row`). +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +SELECT MAX(body) FROM mono_src, repeat_row_non_negative(length(body)); +---- +Explained Query: + With + cte l0 = + Reduce::Hierarchical + aggr_funcs=[max] + monotonic + key_plan + project=() + val_plan=id + FlatMap repeat_row_non_negative(integer_to_bigint(char_length(#0{body}))) + Get::PassArrangements materialize.public.mono_src + raw=true + Return + Union + ArrangeBy + input_key=[] + raw=true + Get::PassArrangements l0 + raw=false + arrangements[0]={ key=[], permutation=id, thinning=(#0) } + Mfp + project=(#0) + map=(null) + Union consolidate_output=true + Negate + Get::Arrangement l0 + project=() + key= + raw=false + arrangements[0]={ key=[], permutation=id, thinning=(#0) } + Constant + - () + +Source materialize.public.mono_src + +Target cluster: quickstart + +EOF diff --git a/test/sqllogictest/transform/monotonic.slt b/test/sqllogictest/transform/monotonic.slt index 345bff4a350de..67a44ffa99da8 100644 --- a/test/sqllogictest/transform/monotonic.slt +++ b/test/sqllogictest/transform/monotonic.slt @@ -7,10 +7,7 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. -# -# Test Common subexpression elimination for Relations. -# PR https://github.com/MaterializeInc/materialize/pull/7715 -# +# Tests the `Monotonic` MIR transform. statement ok CREATE SOURCE counter_src FROM LOAD GENERATOR COUNTER;