Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 56 additions & 40 deletions src/compute-types/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,36 +490,9 @@ impl Plan {
Self::refine_union_negate_consolidation(&mut dataflow);
}

if dataflow.is_single_time() {
Self::refine_single_time_operator_selection(&mut dataflow);

// The relaxation of the `must_consolidate` flag performs an LIR-based
// analysis and transform under checked recursion. By a similar argument
// made in `from_mir`, we do not expect the recursion limit to be hit.
// However, if that happens, we propagate an error to the caller.
// To apply the transform, we first obtain monotonic source and index
// global IDs and add them to a `TransformConfig` instance.
let monotonic_ids = dataflow
.source_imports
.iter()
.filter_map(|(id, source_import)| source_import.monotonic.then_some(*id))
.chain(
dataflow
.index_imports
.iter()
.filter_map(|(_id, index_import)| {
if index_import.monotonic {
Some(index_import.desc.on_id)
} else {
None
}
}),
)
.collect::<BTreeSet<_>>();
Self::refine_single_time_operator_selection(&mut dataflow);

let config = TransformConfig { monotonic_ids };
Self::refine_single_time_consolidation(&mut dataflow, &config)?;
}
Self::refine_single_time_consolidation(&mut dataflow)?;

soft_assert_eq_no_log!(dataflow.check_invariants(), Ok(()));

Expand Down Expand Up @@ -639,18 +612,36 @@ impl Plan {
mz_repr::explain::trace_plan(dataflow);
}

/// Refines the plans of objects to be built as part of `dataflow` to take advantage
/// of monotonic operators if the dataflow refers to a single-time, i.e., is for a
/// one-shot SELECT query.
/// If the dataflow refers to a single-time, i.e., is for a one-shot SELECT query, this function
/// refines the `dataflow` to use monotonic operators.
///
/// Note that we set the `must_consolidate` flag on the monotonic operators, i.e., we don't
/// assume physical monotonicity here. Reasoning about physical monotonicity and refining the
/// `must_consolidate` flag is the responsibility of `refine_single_time_consolidation`.
///
/// Note that, strictly speaking, choosing monotonic operators is valid only by assuming that
/// - all inputs of single-time dataflows are logically (i.e., after consolidation) monotonic;
/// - it's not possible to introduce logically non-monotonic collections inside a single-time
/// dataflow when the inputs are logically monotonic.
///
/// This assumption is not true when there are negative accumulations, which can be introduced
/// by bugs (in Materialize or external systems), or by user error when using `repeat_row`.
/// In such cases, the introduced monotonic operators won't produce correct results. This is
/// considered ok, because:
/// - In most cases, monotonic operators will detect non-monotonic input, and cleanly error out.
/// - In some cases, a monotonic operator might produce garbage output when given non-monotonic
/// input. Even this is considered ok, because we generally don't expect the system to always
/// detect negative accumulations (because this seems to be ~impossible to achieve without
/// adding further resource usage to various operators, not just monotonic operators).
#[mz_ore::instrument(
target = "optimizer",
level = "debug",
fields(path.segment = "refine_single_time_operator_selection")
)]
fn refine_single_time_operator_selection(dataflow: &mut DataflowDescription<Self>) {
// We should only reach here if we have a one-shot SELECT query, i.e.,
// a single-time dataflow.
assert!(dataflow.is_single_time());
if !dataflow.is_single_time() {
return;
}

// Upgrade single-time plans to monotonic.
for build_desc in dataflow.objects_to_build.iter_mut() {
Expand Down Expand Up @@ -698,16 +689,41 @@ impl Plan {
)]
fn refine_single_time_consolidation(
dataflow: &mut DataflowDescription<Self>,
config: &TransformConfig,
) -> Result<(), String> {
// We should only reach here if we have a one-shot SELECT query, i.e.,
// a single-time dataflow.
assert!(dataflow.is_single_time());
if !dataflow.is_single_time() {
return Ok(());
}

// The relaxation of the `must_consolidate` flag performs an LIR-based
// analysis and transform under checked recursion. By a similar argument
// made in `from_mir`, we do not expect the recursion limit to be hit.
// However, if that happens, we propagate an error to the caller.
// To apply the transform, we first obtain monotonic source and index
// global IDs and add them to a `TransformConfig` instance.
let monotonic_ids = dataflow
.source_imports
.iter()
.filter_map(|(id, source_import)| source_import.monotonic.then_some(*id))
.chain(
dataflow
.index_imports
.iter()
.filter_map(|(_id, index_import)| {
if index_import.monotonic {
Some(index_import.desc.on_id)
} else {
None
}
}),
)
.collect::<BTreeSet<_>>();

let config = TransformConfig { monotonic_ids };

let transform = transform::RelaxMustConsolidate;
for build_desc in dataflow.objects_to_build.iter_mut() {
transform
.transform(config, &mut build_desc.plan)
.transform(&config, &mut build_desc.plan)
.map_err(|_| "Maximum recursion limit error in consolidation relaxation.")?;
}
mz_repr::explain::trace_plan(dataflow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,10 @@ impl Interpreter for SingleTimeMonotonic<'_> {
_input_key: &Option<Vec<MirScalarExpr>>,
input: Self::Domain,
_exprs: &Vec<MirScalarExpr>,
_func: &TableFunc,
func: &TableFunc,
_mfp: &MapFilterProject,
) -> Self::Domain {
// In a single-time context, we just propagate the monotonicity
// status of the input
input
PhysicallyMonotonic(input.0 && func.preserves_monotonicity())
}

fn join(
Expand Down
1 change: 1 addition & 0 deletions src/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub use interpret::{ColumnSpec, ColumnSpecs, Interpreter, ResultSpec, Trace, Tra
pub use linear::plan::{MfpPlan, SafeMfpPlan};
pub use linear::util::{join_permutations, permutation_for_arrangement};
pub use linear::{MapFilterProject, memoize_expr};
pub use relation::func::REPEAT_ROW_NAME;
pub use relation::func::order_aggregate_datums as order_aggregate_datums_exported_for_benchmarking;
pub use relation::func::{
AggregateFunc, AnalyzedRegex, AnalyzedRegexOpts, CaptureGroupDesc, LagLeadType,
Expand Down
69 changes: 53 additions & 16 deletions src/expr/src/relation/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3280,7 +3280,7 @@ pub fn csv_extract(a: Datum<'_>, n_cols: usize) -> impl Iterator<Item = (Row, Di
})
}

pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
pub fn repeat_row(a: Datum) -> Option<(Row, Diff)> {
let n = a.unwrap_int64();
if n != 0 {
Some((Row::default(), n.into()))
Expand All @@ -3289,6 +3289,22 @@ pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
}
}

pub fn repeat_row_non_negative<'a>(
a: Datum,
) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
let n = a.unwrap_int64();
if n < 0 {
Err(EvalError::InvalidParameterValue(
format!("repeat_row_non_negative got {}", n).into(),
))
} else if n == 0 {
Ok(Box::new(iter::empty()))
} else {
// iterator with 1 element; n goes into the diff
Ok(Box::new(iter::once((Row::default(), n.into()))))
}
}

fn wrap<'a>(datums: &'a [Datum<'a>], width: usize) -> impl Iterator<Item = (Row, Diff)> + 'a {
datums
.chunks(width)
Expand Down Expand Up @@ -3396,7 +3412,16 @@ pub enum TableFunc {
GuardSubquerySize {
column_type: SqlScalarType,
},
Repeat,
/// Repeats the input row the given number of times. Can even repeat a negative number of times,
/// which has some important consequences:
/// - can lead to negative accumulations downstream;
/// - can't be used in `WITH ORDINALITY` and other constructs that are implemented by
/// `TableFunc::WithOrdinality`, e.g., `ROWS FROM`;
/// - output is non-monotonic.
RepeatRow,
/// Same as `RepeatRow`, but errors on a negative count, and thereby avoids the above
/// peculiarities.
RepeatRowNonNegative,
UnnestArray {
el_typ: SqlScalarType,
},
Expand Down Expand Up @@ -3471,7 +3496,7 @@ impl TableFunc {
| TableFunc::GenerateSeriesTimestamp
| TableFunc::GenerateSeriesTimestampTz
| TableFunc::GuardSubquerySize { .. }
| TableFunc::Repeat
| TableFunc::RepeatRowNonNegative
| TableFunc::UnnestArray { .. }
| TableFunc::UnnestList { .. }
| TableFunc::UnnestMap { .. }
Expand All @@ -3481,9 +3506,13 @@ impl TableFunc {
| TableFunc::RegexpMatches => Some(TableFunc::WithOrdinality(WithOrdinality {
inner: Box::new(inner),
})),
// IMPORTANT: Before adding a new table function here, consider negative diffs:
// IMPORTANT: Before adding a new table function above, consider negative diffs:
// `WithOrdinality::eval` will panic if the inner table function emits a negative diff.
TableFunc::WithOrdinality(_) => None,
// (Note that negative diffs in the table function's _input_ don't matter. The table
// function implementation doesn't see the input diffs, so the thing that matters here
// is whether the table function itself can emit a negative diff.)
TableFunc::RepeatRow // can produce negative diffs
| TableFunc::WithOrdinality(_) => None, // no nesting of `WITH ORDINALITY` allowed
}
}
}
Expand Down Expand Up @@ -3566,7 +3595,8 @@ impl TableFunc {
Ok(Box::new([].into_iter()))
}
}
TableFunc::Repeat => Ok(Box::new(repeat(datums[0]).into_iter())),
TableFunc::RepeatRow => Ok(Box::new(repeat_row(datums[0]).into_iter())),
TableFunc::RepeatRowNonNegative => repeat_row_non_negative(datums[0]),
TableFunc::UnnestArray { .. } => Ok(Box::new(unnest_array(datums[0]))),
TableFunc::UnnestList { .. } => Ok(Box::new(unnest_list(datums[0]))),
TableFunc::UnnestMap { .. } => Ok(Box::new(unnest_map(datums[0]))),
Expand Down Expand Up @@ -3682,7 +3712,7 @@ impl TableFunc {
let keys = vec![];
(column_types, keys)
}
TableFunc::Repeat => {
TableFunc::RepeatRow | TableFunc::RepeatRowNonNegative => {
let column_types = vec![];
let keys = vec![];
(column_types, keys)
Expand Down Expand Up @@ -3763,7 +3793,8 @@ impl TableFunc {
TableFunc::GenerateSeriesTimestampTz => 1,
TableFunc::GenerateSubscriptsArray => 1,
TableFunc::GuardSubquerySize { .. } => 1,
TableFunc::Repeat => 0,
TableFunc::RepeatRow => 0,
TableFunc::RepeatRowNonNegative => 0,
TableFunc::UnnestArray { .. } => 1,
TableFunc::UnnestList { .. } => 1,
TableFunc::UnnestMap { .. } => 2,
Expand All @@ -3790,7 +3821,8 @@ impl TableFunc {
| TableFunc::GenerateSubscriptsArray
| TableFunc::RegexpExtract(_)
| TableFunc::CsvExtract(_)
| TableFunc::Repeat
| TableFunc::RepeatRow
| TableFunc::RepeatRowNonNegative
| TableFunc::UnnestArray { .. }
| TableFunc::UnnestList { .. }
| TableFunc::UnnestMap { .. }
Expand Down Expand Up @@ -3821,7 +3853,8 @@ impl TableFunc {
TableFunc::GenerateSeriesTimestamp => true,
TableFunc::GenerateSeriesTimestampTz => true,
TableFunc::GenerateSubscriptsArray => true,
TableFunc::Repeat => false,
TableFunc::RepeatRow => false,
TableFunc::RepeatRowNonNegative => true,
TableFunc::UnnestArray { .. } => true,
TableFunc::UnnestList { .. } => true,
TableFunc::UnnestMap { .. } => true,
Expand Down Expand Up @@ -3852,7 +3885,8 @@ impl fmt::Display for TableFunc {
TableFunc::GenerateSeriesTimestampTz => f.write_str("generate_series"),
TableFunc::GenerateSubscriptsArray => f.write_str("generate_subscripts"),
TableFunc::GuardSubquerySize { .. } => f.write_str("guard_subquery_size"),
TableFunc::Repeat => f.write_str("repeat_row"),
TableFunc::RepeatRow => f.write_str(REPEAT_ROW_NAME),
TableFunc::RepeatRowNonNegative => f.write_str("repeat_row_non_negative"),
TableFunc::UnnestArray { .. } => f.write_str("unnest_array"),
TableFunc::UnnestList { .. } => f.write_str("unnest_list"),
TableFunc::UnnestMap { .. } => f.write_str("unnest_map"),
Expand Down Expand Up @@ -3886,12 +3920,13 @@ impl WithOrdinality {
.eval(datums, temp_storage)?
.flat_map(move |(mut row, diff)| {
let diff = diff.into_inner();
// WITH ORDINALITY is not well-defined for negative diffs. This is ok, since the
// only table function that can emit negative diffs is `repeat_row`, which is in
// `mz_unsafe`, so users can never call it.
// WITH ORDINALITY is not well-defined for negative diffs. This is ok, and
// `TableFunc::with_ordinality` refuses to wrap such table functions in
// `WithOrdinality` that can emit negative diffs, e.g., `repeat_row`.
//
// (We also don't need to worry about negative diffs in FlatMap's input, because
// the diff of the input of the FlatMap is factored in after we return from here.)
// (Note that we don't need to worry about negative diffs in FlatMap's input,
// because the diff of the input of the FlatMap is factored in after we return from
// here.)
assert!(diff >= 0);
// The ordinals that will be associated with this row.
let mut ordinals = next_ordinal..(next_ordinal + diff);
Expand All @@ -3916,3 +3951,5 @@ impl WithOrdinality {
Ok(Box::new(it))
}
}

pub const REPEAT_ROW_NAME: &str = "repeat_row";
3 changes: 2 additions & 1 deletion src/pgrepr-consts/src/oid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub const FUNC_MZ_LOGICAL_TIMESTAMP_OID: u32 = 16_409;
pub const FUNC_MZ_RENDER_TYPMOD_OID: u32 = 16_410;
pub const FUNC_MZ_VERSION_OID: u32 = 16_411;
pub const FUNC_REGEXP_EXTRACT_OID: u32 = 16_412;
pub const FUNC_REPEAT_OID: u32 = 16_413;
pub const FUNC_REPEAT_ROW_OID: u32 = 16_413;
pub const FUNC_ROUND_F32_OID: u32 = 16_414;
pub const FUNC_UNNEST_LIST_OID: u32 = 16_416;
pub const OP_CONCAT_ELEMENY_LIST_OID: u32 = 16_417;
Expand Down Expand Up @@ -792,3 +792,4 @@ pub const VIEW_MZ_MCP_DATA_PRODUCT_DETAILS_OID: u32 = 17071;
pub const VIEW_MZ_BUILTIN_MATERIALIZED_VIEWS_OID: u32 = 17072;
pub const FUNC_PARSE_CATALOG_CREATE_SQL_OID: u32 = 17073;
pub const FUNC_REDACT_SQL_OID: u32 = 17074;
pub const FUNC_REPEAT_ROW_NON_NEGATIVE_OID: u32 = 17075;
26 changes: 19 additions & 7 deletions src/sql/src/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4566,7 +4566,7 @@ pub static MZ_CATALOG_BUILTINS: LazyLock<BTreeMap<&'static str, Func>> = LazyLoc
},
"list_n_layers" => Scalar {
vec![ListAny] => Operation::unary(|ecx, e| {
ecx.require_feature_flag(&crate::session::vars::ENABLE_LIST_N_LAYERS)?;
ecx.require_feature_flag(&vars::ENABLE_LIST_N_LAYERS)?;
let d = ecx.scalar_type(&e).unwrap_list_n_layers();
match i32::try_from(d) {
Ok(d) => Ok(HirScalarExpr::literal(Datum::Int32(d), SqlScalarType::Int32)),
Expand All @@ -4581,7 +4581,7 @@ pub static MZ_CATALOG_BUILTINS: LazyLock<BTreeMap<&'static str, Func>> = LazyLoc
},
"list_length_max" => Scalar {
vec![ListAny, Plain(SqlScalarType::Int64)] => Operation::binary(|ecx, lhs, rhs| {
ecx.require_feature_flag(&crate::session::vars::ENABLE_LIST_LENGTH_MAX)?;
ecx.require_feature_flag(&vars::ENABLE_LIST_LENGTH_MAX)?;
let max_layer = ecx.scalar_type(&lhs).unwrap_list_n_layers();
Ok(lhs.call_binary(rhs, BinaryFunc::from(func::ListLengthMax { max_layer })))
}) => Int32, oid::FUNC_LIST_LENGTH_MAX_OID;
Expand All @@ -4593,7 +4593,7 @@ pub static MZ_CATALOG_BUILTINS: LazyLock<BTreeMap<&'static str, Func>> = LazyLoc
},
"list_remove" => Scalar {
vec![ListAnyCompatible, ListElementAnyCompatible] => Operation::binary(|ecx, lhs, rhs| {
ecx.require_feature_flag(&crate::session::vars::ENABLE_LIST_REMOVE)?;
ecx.require_feature_flag(&vars::ENABLE_LIST_REMOVE)?;
Ok(lhs.call_binary(rhs, func::ListRemove))
}) => ListAnyCompatible, oid::FUNC_LIST_REMOVE_OID;
},
Expand Down Expand Up @@ -4733,17 +4733,29 @@ pub static MZ_CATALOG_BUILTINS: LazyLock<BTreeMap<&'static str, Func>> = LazyLoc
})
}) => ReturnType::set_of(RecordAny), oid::FUNC_REGEXP_EXTRACT_OID;
},
"repeat_row" => Table {
mz_expr::REPEAT_ROW_NAME => Table {
params!(Int64) => Operation::unary(move |ecx, n| {
ecx.require_feature_flag(&crate::session::vars::ENABLE_REPEAT_ROW)?;
ecx.require_feature_flag(&vars::ENABLE_REPEAT_ROW)?;
Ok(TableFuncPlan {
imp: TableFuncImpl::CallTable {
func: TableFunc::Repeat,
func: TableFunc::RepeatRow,
exprs: vec![n],
},
column_names: vec![]
})
}) => ReturnType::none(true), oid::FUNC_REPEAT_OID;
}) => ReturnType::none(true), oid::FUNC_REPEAT_ROW_OID;
},
"repeat_row_non_negative" => Table {
params!(Int64) => Operation::unary(move |ecx, n| {
ecx.require_feature_flag(&vars::ENABLE_REPEAT_ROW_NON_NEGATIVE)?;
Ok(TableFuncPlan {
imp: TableFuncImpl::CallTable {
func: TableFunc::RepeatRowNonNegative,
exprs: vec![n],
},
column_names: vec![]
})
}) => ReturnType::none(true), oid::FUNC_REPEAT_ROW_NON_NEGATIVE_OID;
},
"seahash" => Scalar {
params!(String) => UnaryFunc::SeahashString(func::SeahashString)
Expand Down
Loading
Loading