Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,10 @@ impl Interpreter for SingleTimeMonotonic<'_> {
_input_key: &Option<Vec<MirScalarExpr>>,
input: Self::Domain,
_exprs: &Vec<MirScalarExpr>,
_func: &TableFunc,
func: &TableFunc,
_mfp: &MapFilterProject,
) -> Self::Domain {
// In a single-time context, we just propagate the monotonicity
// status of the input
input
PhysicallyMonotonic(input.0 && func.preserves_monotonicity())
}

fn join(
Expand Down
1 change: 1 addition & 0 deletions src/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub use interpret::{ColumnSpec, ColumnSpecs, Interpreter, ResultSpec, Trace, Tra
pub use linear::plan::{MfpPlan, SafeMfpPlan};
pub use linear::util::{join_permutations, permutation_for_arrangement};
pub use linear::{MapFilterProject, memoize_expr};
pub use relation::func::REPEAT_ROW_NAME;
pub use relation::func::order_aggregate_datums as order_aggregate_datums_exported_for_benchmarking;
pub use relation::func::{
AggregateFunc, AnalyzedRegex, AnalyzedRegexOpts, CaptureGroupDesc, LagLeadType,
Expand Down
69 changes: 53 additions & 16 deletions src/expr/src/relation/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3280,7 +3280,7 @@ pub fn csv_extract(a: Datum<'_>, n_cols: usize) -> impl Iterator<Item = (Row, Di
})
}

pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
pub fn repeat_row(a: Datum) -> Option<(Row, Diff)> {
let n = a.unwrap_int64();
if n != 0 {
Some((Row::default(), n.into()))
Expand All @@ -3289,6 +3289,22 @@ pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
}
}

pub fn repeat_row_non_negative<'a>(
a: Datum,
) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
let n = a.unwrap_int64();
if n < 0 {
Err(EvalError::InvalidParameterValue(
format!("repeat_row_non_negative got {}", n).into(),
))
} else if n == 0 {
Ok(Box::new(iter::empty()))
} else {
// iterator with 1 element; n goes into the diff
Ok(Box::new(iter::once((Row::default(), n.into()))))
}
}

fn wrap<'a>(datums: &'a [Datum<'a>], width: usize) -> impl Iterator<Item = (Row, Diff)> + 'a {
datums
.chunks(width)
Expand Down Expand Up @@ -3396,7 +3412,16 @@ pub enum TableFunc {
GuardSubquerySize {
column_type: SqlScalarType,
},
Repeat,
/// Repeats the input row the given number of times. Can even repeat a negative number of times,
/// which has some important consequences:
/// - can lead to negative accumulations downstream;
/// - can't be used in `WITH ORDINALITY` and other constructs that are implemented by
/// `TableFunc::WithOrdinality`, e.g., `ROWS FROM`;
/// - output is non-monotonic.
RepeatRow,
/// Same as `RepeatRow`, but errors on a negative count, and thereby avoids the above
/// peculiarities.
RepeatRowNonNegative,
UnnestArray {
el_typ: SqlScalarType,
},
Expand Down Expand Up @@ -3471,7 +3496,7 @@ impl TableFunc {
| TableFunc::GenerateSeriesTimestamp
| TableFunc::GenerateSeriesTimestampTz
| TableFunc::GuardSubquerySize { .. }
| TableFunc::Repeat
| TableFunc::RepeatRowNonNegative
| TableFunc::UnnestArray { .. }
| TableFunc::UnnestList { .. }
| TableFunc::UnnestMap { .. }
Expand All @@ -3481,9 +3506,13 @@ impl TableFunc {
| TableFunc::RegexpMatches => Some(TableFunc::WithOrdinality(WithOrdinality {
inner: Box::new(inner),
})),
// IMPORTANT: Before adding a new table function here, consider negative diffs:
// IMPORTANT: Before adding a new table function above, consider negative diffs:
// `WithOrdinality::eval` will panic if the inner table function emits a negative diff.
TableFunc::WithOrdinality(_) => None,
// (Note that negative diffs in the table function's _input_ don't matter. The table
// function implementation doesn't see the input diffs, so the thing that matters here
// is whether the table function itself can emit a negative diff.)
TableFunc::RepeatRow // can produce negative diffs
| TableFunc::WithOrdinality(_) => None, // no nesting of `WITH ORDINALITY` allowed
}
}
}
Expand Down Expand Up @@ -3566,7 +3595,8 @@ impl TableFunc {
Ok(Box::new([].into_iter()))
}
}
TableFunc::Repeat => Ok(Box::new(repeat(datums[0]).into_iter())),
TableFunc::RepeatRow => Ok(Box::new(repeat_row(datums[0]).into_iter())),
TableFunc::RepeatRowNonNegative => repeat_row_non_negative(datums[0]),
TableFunc::UnnestArray { .. } => Ok(Box::new(unnest_array(datums[0]))),
TableFunc::UnnestList { .. } => Ok(Box::new(unnest_list(datums[0]))),
TableFunc::UnnestMap { .. } => Ok(Box::new(unnest_map(datums[0]))),
Expand Down Expand Up @@ -3682,7 +3712,7 @@ impl TableFunc {
let keys = vec![];
(column_types, keys)
}
TableFunc::Repeat => {
TableFunc::RepeatRow | TableFunc::RepeatRowNonNegative => {
let column_types = vec![];
let keys = vec![];
(column_types, keys)
Expand Down Expand Up @@ -3763,7 +3793,8 @@ impl TableFunc {
TableFunc::GenerateSeriesTimestampTz => 1,
TableFunc::GenerateSubscriptsArray => 1,
TableFunc::GuardSubquerySize { .. } => 1,
TableFunc::Repeat => 0,
TableFunc::RepeatRow => 0,
TableFunc::RepeatRowNonNegative => 0,
TableFunc::UnnestArray { .. } => 1,
TableFunc::UnnestList { .. } => 1,
TableFunc::UnnestMap { .. } => 2,
Expand All @@ -3790,7 +3821,8 @@ impl TableFunc {
| TableFunc::GenerateSubscriptsArray
| TableFunc::RegexpExtract(_)
| TableFunc::CsvExtract(_)
| TableFunc::Repeat
| TableFunc::RepeatRow
| TableFunc::RepeatRowNonNegative
| TableFunc::UnnestArray { .. }
| TableFunc::UnnestList { .. }
| TableFunc::UnnestMap { .. }
Expand Down Expand Up @@ -3821,7 +3853,8 @@ impl TableFunc {
TableFunc::GenerateSeriesTimestamp => true,
TableFunc::GenerateSeriesTimestampTz => true,
TableFunc::GenerateSubscriptsArray => true,
TableFunc::Repeat => false,
TableFunc::RepeatRow => false,
TableFunc::RepeatRowNonNegative => true,
TableFunc::UnnestArray { .. } => true,
TableFunc::UnnestList { .. } => true,
TableFunc::UnnestMap { .. } => true,
Expand Down Expand Up @@ -3852,7 +3885,8 @@ impl fmt::Display for TableFunc {
TableFunc::GenerateSeriesTimestampTz => f.write_str("generate_series"),
TableFunc::GenerateSubscriptsArray => f.write_str("generate_subscripts"),
TableFunc::GuardSubquerySize { .. } => f.write_str("guard_subquery_size"),
TableFunc::Repeat => f.write_str("repeat_row"),
TableFunc::RepeatRow => f.write_str(REPEAT_ROW_NAME),
TableFunc::RepeatRowNonNegative => f.write_str("repeat_row_non_negative"),
TableFunc::UnnestArray { .. } => f.write_str("unnest_array"),
TableFunc::UnnestList { .. } => f.write_str("unnest_list"),
TableFunc::UnnestMap { .. } => f.write_str("unnest_map"),
Expand Down Expand Up @@ -3886,12 +3920,13 @@ impl WithOrdinality {
.eval(datums, temp_storage)?
.flat_map(move |(mut row, diff)| {
let diff = diff.into_inner();
// WITH ORDINALITY is not well-defined for negative diffs. This is ok, since the
// only table function that can emit negative diffs is `repeat_row`, which is in
// `mz_unsafe`, so users can never call it.
// WITH ORDINALITY is not well-defined for negative diffs. This is ok, and
// `TableFunc::with_ordinality` refuses to wrap such table functions in
// `WithOrdinality` that can emit negative diffs, e.g., `repeat_row`.
//
// (We also don't need to worry about negative diffs in FlatMap's input, because
// the diff of the input of the FlatMap is factored in after we return from here.)
// (Note that we don't need to worry about negative diffs in FlatMap's input,
// because the diff of the input of the FlatMap is factored in after we return from
// here.)
assert!(diff >= 0);
// The ordinals that will be associated with this row.
let mut ordinals = next_ordinal..(next_ordinal + diff);
Expand All @@ -3916,3 +3951,5 @@ impl WithOrdinality {
Ok(Box::new(it))
}
}

pub const REPEAT_ROW_NAME: &str = "repeat_row";
3 changes: 2 additions & 1 deletion src/pgrepr-consts/src/oid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub const FUNC_MZ_LOGICAL_TIMESTAMP_OID: u32 = 16_409;
pub const FUNC_MZ_RENDER_TYPMOD_OID: u32 = 16_410;
pub const FUNC_MZ_VERSION_OID: u32 = 16_411;
pub const FUNC_REGEXP_EXTRACT_OID: u32 = 16_412;
pub const FUNC_REPEAT_OID: u32 = 16_413;
pub const FUNC_REPEAT_ROW_OID: u32 = 16_413;
pub const FUNC_ROUND_F32_OID: u32 = 16_414;
pub const FUNC_UNNEST_LIST_OID: u32 = 16_416;
pub const OP_CONCAT_ELEMENY_LIST_OID: u32 = 16_417;
Expand Down Expand Up @@ -792,3 +792,4 @@ pub const VIEW_MZ_MCP_DATA_PRODUCT_DETAILS_OID: u32 = 17071;
pub const VIEW_MZ_BUILTIN_MATERIALIZED_VIEWS_OID: u32 = 17072;
pub const FUNC_PARSE_CATALOG_CREATE_SQL_OID: u32 = 17073;
pub const FUNC_REDACT_SQL_OID: u32 = 17074;
pub const FUNC_REPEAT_ROW_NON_NEGATIVE_OID: u32 = 17075;
26 changes: 19 additions & 7 deletions src/sql/src/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4566,7 +4566,7 @@ pub static MZ_CATALOG_BUILTINS: LazyLock<BTreeMap<&'static str, Func>> = LazyLoc
},
"list_n_layers" => Scalar {
vec![ListAny] => Operation::unary(|ecx, e| {
ecx.require_feature_flag(&crate::session::vars::ENABLE_LIST_N_LAYERS)?;
ecx.require_feature_flag(&vars::ENABLE_LIST_N_LAYERS)?;
let d = ecx.scalar_type(&e).unwrap_list_n_layers();
match i32::try_from(d) {
Ok(d) => Ok(HirScalarExpr::literal(Datum::Int32(d), SqlScalarType::Int32)),
Expand All @@ -4581,7 +4581,7 @@ pub static MZ_CATALOG_BUILTINS: LazyLock<BTreeMap<&'static str, Func>> = LazyLoc
},
"list_length_max" => Scalar {
vec![ListAny, Plain(SqlScalarType::Int64)] => Operation::binary(|ecx, lhs, rhs| {
ecx.require_feature_flag(&crate::session::vars::ENABLE_LIST_LENGTH_MAX)?;
ecx.require_feature_flag(&vars::ENABLE_LIST_LENGTH_MAX)?;
let max_layer = ecx.scalar_type(&lhs).unwrap_list_n_layers();
Ok(lhs.call_binary(rhs, BinaryFunc::from(func::ListLengthMax { max_layer })))
}) => Int32, oid::FUNC_LIST_LENGTH_MAX_OID;
Expand All @@ -4593,7 +4593,7 @@ pub static MZ_CATALOG_BUILTINS: LazyLock<BTreeMap<&'static str, Func>> = LazyLoc
},
"list_remove" => Scalar {
vec![ListAnyCompatible, ListElementAnyCompatible] => Operation::binary(|ecx, lhs, rhs| {
ecx.require_feature_flag(&crate::session::vars::ENABLE_LIST_REMOVE)?;
ecx.require_feature_flag(&vars::ENABLE_LIST_REMOVE)?;
Ok(lhs.call_binary(rhs, func::ListRemove))
}) => ListAnyCompatible, oid::FUNC_LIST_REMOVE_OID;
},
Expand Down Expand Up @@ -4733,17 +4733,29 @@ pub static MZ_CATALOG_BUILTINS: LazyLock<BTreeMap<&'static str, Func>> = LazyLoc
})
}) => ReturnType::set_of(RecordAny), oid::FUNC_REGEXP_EXTRACT_OID;
},
"repeat_row" => Table {
mz_expr::REPEAT_ROW_NAME => Table {
params!(Int64) => Operation::unary(move |ecx, n| {
ecx.require_feature_flag(&crate::session::vars::ENABLE_REPEAT_ROW)?;
ecx.require_feature_flag(&vars::ENABLE_REPEAT_ROW)?;
Ok(TableFuncPlan {
imp: TableFuncImpl::CallTable {
func: TableFunc::Repeat,
func: TableFunc::RepeatRow,
exprs: vec![n],
},
column_names: vec![]
})
}) => ReturnType::none(true), oid::FUNC_REPEAT_OID;
}) => ReturnType::none(true), oid::FUNC_REPEAT_ROW_OID;
},
"repeat_row_non_negative" => Table {
params!(Int64) => Operation::unary(move |ecx, n| {
ecx.require_feature_flag(&vars::ENABLE_REPEAT_ROW_NON_NEGATIVE)?;
Ok(TableFuncPlan {
imp: TableFuncImpl::CallTable {
func: TableFunc::RepeatRowNonNegative,
exprs: vec![n],
},
column_names: vec![]
})
}) => ReturnType::none(true), oid::FUNC_REPEAT_ROW_NON_NEGATIVE_OID;
},
"seahash" => Scalar {
params!(String) => UnaryFunc::SeahashString(func::SeahashString)
Expand Down
25 changes: 23 additions & 2 deletions src/sql/src/plan/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ use mz_expr::func::variadic::{
};
use mz_expr::virtual_syntax::AlgExcept;
use mz_expr::{
Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, RowSetFinishing, TableFunc,
func as expr_func,
Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, REPEAT_ROW_NAME, RowSetFinishing,
TableFunc, func as expr_func,
};
use mz_ore::assert_none;
use mz_ore::collections::CollectionExt;
Expand All @@ -66,6 +66,7 @@ use mz_repr::adt::char::CharLength;
use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
use mz_repr::adt::timestamp::TimestampPrecision;
use mz_repr::adt::varchar::VarCharMaxLength;
use mz_repr::namespaces::MZ_CATALOG_SCHEMA;
use mz_repr::{
CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector,
ReprColumnType, Row, RowArena, SqlColumnType, SqlRelationType, SqlScalarType,
Expand Down Expand Up @@ -2826,6 +2827,14 @@ fn plan_scalar_table_funcs(
}
return Ok((expr, scope));
}
if table_funcs.keys().any(is_repeat_row) {
// Note: Would be also caught by WITH ORDINALITY checking for `repeat_row`, but then the
// error message would be misleading, because it would refer to WITH ORDINALITY.
bail_unsupported!(format!(
"{} in a SELECT clause with multiple table functions",
REPEAT_ROW_NAME
));
}
// Otherwise, plan as usual, emulating the ROWS FROM behavior
let (expr, mut scope, num_cols) =
plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
Expand Down Expand Up @@ -3111,6 +3120,14 @@ fn plan_rows_from(
alias: Option<&TableAlias>,
with_ordinality: bool,
) -> Result<(HirRelationExpr, Scope), PlanError> {
// The `repeat_row` function is not supported in ROWS FROM.
if functions.iter().any(is_repeat_row) {
// Note: Would be also caught by WITH ORDINALITY checking for `repeat_row`, but then the
// error message would be misleading, because it would refer to WITH ORDINALITY instead of
// ROWS FROM.
bail_unsupported!(format!("{} in ROWS FROM", REPEAT_ROW_NAME));
}

// If there's only a single table function, planning proceeds as if `ROWS
// FROM` hadn't been written at all.
if let [function] = functions {
Expand Down Expand Up @@ -3155,6 +3172,10 @@ fn plan_rows_from(
Ok((expr, scope))
}

fn is_repeat_row(f: &Function<Aug>) -> bool {
f.name.full_name_str().as_str() == format!("{}.{}", MZ_CATALOG_SCHEMA, REPEAT_ROW_NAME)
}

/// Plans an expression coalescing multiple table functions. Each table
/// function is followed by its row ordinality. The entire expression is
/// followed by the coalesced row ordinality.
Expand Down
6 changes: 6 additions & 0 deletions src/sql/src/session/vars/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1880,6 +1880,12 @@ feature_flags!(
default: false,
enable_for_item_parsing: true,
},
{
name: enable_repeat_row_non_negative,
desc: "the repeat_row_non_negative function",
default: false,
enable_for_item_parsing: true,
},
{
name: enable_replica_targeted_materialized_views,
desc: "replica-targeted materialized views",
Expand Down
Loading
Loading