Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
dbf2aa5
add lambda support
gstvg Nov 22, 2025
fa4a8fb
add lambdas: None to existing ScalarFunctionArgs in tests/benches
gstvg Nov 22, 2025
b18d214
simplify lambda support
gstvg Dec 15, 2025
d844b2d
rename LambdaColumn to LambdaVariable
gstvg Dec 19, 2025
e1921eb
feat: add LambdaUDF
gstvg Feb 23, 2026
1f19c64
feat: remove lambda support for ScalarUDF
gstvg Feb 23, 2026
570cc53
temporarily add pr description as DOC.md
gstvg Mar 1, 2026
83dfbdd
add lambda note in substrait consumer
gstvg Mar 8, 2026
34137e1
add LambdaSignature
gstvg Mar 8, 2026
3ded115
improve lambda type coercion
gstvg Mar 8, 2026
82930ec
lambda function type coercion: stop using unstable Iterator::eq_by
gstvg Mar 9, 2026
86d5999
remove signature section from DOC.md
gstvg Mar 9, 2026
60cabc0
polish lambda impl
gstvg Mar 15, 2026
41152c3
minor improvoments
gstvg Mar 15, 2026
2be9e54
Merge branch 'main' into lambda4
gstvg Mar 15, 2026
90eb08f
improve lambdas
gstvg Mar 17, 2026
d874db7
cargo fmt
gstvg Mar 17, 2026
a59ffe8
simplify LambdaUDF coerce_value_types
gstvg Mar 17, 2026
cd22c04
remove DOC.md
gstvg Mar 18, 2026
0188d40
add physical lambda function comments
gstvg Mar 18, 2026
6f2c92b
remove secondary lambda features to be added later
gstvg Mar 18, 2026
b3bdc48
fix removal of lambda features
gstvg Mar 18, 2026
9728a2e
fix typo
gstvg Mar 18, 2026
811aa0a
Merge branch 'main' of https://github.com/apache/datafusion into lambda4
gstvg Mar 18, 2026
f724ef5
remove paste! from lambda macros
gstvg Mar 19, 2026
5380884
fix lambda sqllogictests
gstvg Mar 19, 2026
d75dfe3
improve Expr::Lambda docs
gstvg Mar 23, 2026
a241a51
add clarifying comment on lambda type coercion
gstvg Mar 23, 2026
547c148
simplify lambda type coercion
gstvg Mar 23, 2026
6ae73cb
handle null values in array_transform
gstvg Mar 24, 2026
39db62b
simplify LambdaUDF::lambdas_parameters
gstvg Mar 24, 2026
7a7d371
cargo fmt
gstvg Mar 24, 2026
83fb18d
add tip on LambdaUDF::lambdas_parameters docs to LambdaFunction helper
gstvg Mar 25, 2026
e76ff25
minor fixes
gstvg Mar 25, 2026
51dfa81
Merge branch 'main' of https://github.com/apache/datafusion into lambda4
gstvg Mar 25, 2026
6c32ef8
minor fixes
gstvg Mar 25, 2026
27a3e24
simplify array_transform tests
gstvg Mar 27, 2026
93e66f7
evaluate LambdaVariable by index instead of name
gstvg Mar 27, 2026
a9d0e6c
add LambdaUDF::clean_null_values
gstvg Mar 29, 2026
69c44fc
rename LambdaUDF::lambdas_parameters to lambda_parameters
gstvg Mar 29, 2026
66839d3
rename LambdaFunction to HigherOrderFunction, LambdaUDF to HigherOrde…
gstvg Mar 29, 2026
f0cf8d7
fix typo
gstvg Mar 30, 2026
1b7f4bf
handle CaseWhen optimization
gstvg Mar 30, 2026
6d7c52a
remove HigherOrderUDF::as_any
gstvg Mar 30, 2026
7255820
add TaskContext::higher_order_functions
gstvg Mar 30, 2026
96d8ad2
return DataType::Null for Expr::Lambda ExprSchemable::get_type
gstvg Apr 10, 2026
474b22e
add higher order function type coercion tests
gstvg Apr 13, 2026
954a360
avoid clone at HigherOrderFunctionExpr::with_nullable
gstvg Apr 13, 2026
5c2c72a
include index in physical LambdaVariable formatting
gstvg Apr 13, 2026
2259f70
improve physical LambdaVariable
gstvg Apr 13, 2026
8571853
fix typo udf to udhof at HigherOrderFunctionExpr
gstvg Apr 13, 2026
3c9fe39
include higher order functions in scalar function sql user guide docs
gstvg Apr 14, 2026
3486e53
Merge branch 'main' of https://github.com/apache/datafusion into lambda4
gstvg Apr 14, 2026
5c0b41d
Merge branch 'main' into lambda4
gstvg Apr 14, 2026
ca260a7
handle wrapped lambdas
gstvg Apr 15, 2026
5a6f470
Merge branch 'main' of https://github.com/apache/datafusion into lambda4
gstvg Apr 15, 2026
98d365a
Merge branch 'main' of https://github.com/apache/datafusion into lambda4
gstvg Apr 15, 2026
9ad1d6f
remove PhysicalExpr::as_any from lambda impl
gstvg Apr 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
| Expr::GroupingSet(_)
| Expr::Case(_) => Ok(TreeNodeRecursion::Continue),
| Expr::Case(_)
| Expr::Lambda(_)
| Expr::LambdaColumn(_) => Ok(TreeNodeRecursion::Continue),

Expr::ScalarFunction(scalar_function) => {
match scalar_function.func.signature().volatility {
Expand Down
125 changes: 123 additions & 2 deletions datafusion/common/src/utils/mod.rs
Copy link
Copy Markdown
Member

@rluvaton rluvaton Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to create PR to arrow with my utils to handle nulls and list sliced but don't wait for me

Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,26 @@ pub mod memory;
pub mod proxy;
pub mod string_utils;

use crate::error::{_exec_datafusion_err, _internal_datafusion_err, _internal_err};
use crate::error::{
_exec_datafusion_err, _exec_err, _internal_datafusion_err, _internal_err,
};
use crate::{Result, ScalarValue};
use arrow::array::{
cast::AsArray, Array, ArrayRef, FixedSizeListArray, LargeListArray, ListArray,
OffsetSizeTrait,
};
use arrow::array::{ArrowPrimitiveType, PrimitiveArray};
use arrow::buffer::OffsetBuffer;
use arrow::compute::{partition, SortColumn, SortOptions};
use arrow::datatypes::{DataType, Field, SchemaRef};
use arrow::datatypes::{
ArrowNativeType, DataType, Field, Int32Type, Int64Type, SchemaRef,
};
#[cfg(feature = "sql")]
use sqlparser::{ast::Ident, dialect::GenericDialect, parser::Parser};
use std::borrow::{Borrow, Cow};
use std::cmp::{min, Ordering};
use std::collections::HashSet;
use std::iter::repeat_n;
use std::num::NonZero;
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -939,6 +945,121 @@ pub fn take_function_args<const N: usize, T>(
})
}

/// [0, 2, 2, 5, 6] -> [0, 0, 2, 2, 2, 3]
pub fn make_list_array_indices<T: ArrowPrimitiveType>(
offsets: &OffsetBuffer<T::Native>,
) -> PrimitiveArray<T> {
let mut indices = Vec::with_capacity(
offsets.last().unwrap().as_usize() - offsets.first().unwrap().as_usize(),
);

for (i, (&start, &end)) in std::iter::zip(&offsets[..], &offsets[1..]).enumerate() {
indices.extend(repeat_n(
T::Native::usize_as(i),
end.as_usize() - start.as_usize(),
));
}

PrimitiveArray::new(indices.into(), None)
}

/// [0, 2, 2, 5, 6] -> [0, 1, 0, 1, 2, 0]
pub fn make_list_element_indices<T: ArrowPrimitiveType>(
offsets: &OffsetBuffer<T::Native>,
) -> PrimitiveArray<T> {
let mut indices =
Vec::with_capacity(offsets.last().unwrap().as_usize() - offsets[0].as_usize());

for (&start, &end) in std::iter::zip(&offsets[..], &offsets[1..]) {
indices.extend(
(0..end.as_usize() - start.as_usize()).map(|i| T::Native::usize_as(i)),
);
}

PrimitiveArray::new(indices.into(), None)
}

/// (3, 2) -> [0, 0, 1, 1, 2, 2]
pub fn make_fsl_array_indices(
list_size: i32,
array_len: usize,
) -> PrimitiveArray<Int32Type> {
let mut indices = Vec::with_capacity(list_size as usize * array_len);

for i in 0..array_len {
indices.extend(repeat_n(i as i32, list_size as usize));
}

PrimitiveArray::new(indices.into(), None)
}

/// (3, 2) -> [0, 1, 0, 1, 0, 1]
pub fn make_fsl_element_indices(
list_size: i32,
array_len: usize,
) -> PrimitiveArray<Int32Type> {
let mut indices = Vec::with_capacity(list_size as usize * array_len);

if array_len > 0 {
indices.extend((0..list_size as usize).map(|j| j as i32));

for _ in 1..array_len {
indices.extend_from_within(0..list_size as usize);
}
}

PrimitiveArray::new(indices.into(), None)
}

pub fn list_values(array: &dyn Array) -> Result<&ArrayRef> {
match array.data_type() {
DataType::List(_) => Ok(array.as_list::<i32>().values()),
DataType::LargeList(_) => Ok(array.as_list::<i64>().values()),
DataType::FixedSizeList(_, _) => Ok(array.as_fixed_size_list().values()),
other => _exec_err!("expected list, got {other}"),
}
}

pub fn list_indices(array: &dyn Array) -> Result<ArrayRef> {
match array.data_type() {
DataType::List(_) => Ok(Arc::new(make_list_array_indices::<Int32Type>(
array.as_list().offsets(),
))),
DataType::LargeList(_) => Ok(Arc::new(make_list_array_indices::<Int64Type>(
array.as_list().offsets(),
))),
DataType::FixedSizeList(_, _) => {
let fixed_size_list = array.as_fixed_size_list();

Ok(Arc::new(make_fsl_array_indices(
fixed_size_list.value_length(),
fixed_size_list.len(),
)))
}
other => _exec_err!("expected list, got {other}"),
}
}

pub fn elements_indices(array: &dyn Array) -> Result<ArrayRef> {
match array.data_type() {
DataType::List(_) => Ok(Arc::new(make_list_element_indices::<Int32Type>(
array.as_list::<i32>().offsets(),
))),
DataType::LargeList(_) => Ok(Arc::new(make_list_element_indices::<Int64Type>(
array.as_list::<i64>().offsets(),
))),
DataType::FixedSizeList(_, _) => {
let fixed_size_list = array.as_fixed_size_list();

Ok(Arc::new(make_fsl_element_indices(
fixed_size_list.value_length(),
fixed_size_list.len(),
)))
}
other => _exec_err!("expected list, got {other}"),
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
73 changes: 72 additions & 1 deletion datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,30 @@ pub enum Expr {
OuterReferenceColumn(FieldRef, Column),
/// Unnest expression
Unnest(Unnest),
/// Lambda expression
Lambda(Lambda),
LambdaColumn(LambdaColumn),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gstvg Would LambdaVariable be a more accurate name here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done in d844b2d

}

#[derive(Clone, PartialEq, PartialOrd, Eq, Debug, Hash)]
pub struct LambdaColumn {
pub name: String,
pub field: FieldRef,
pub spans: Spans,
}

impl LambdaColumn {
pub fn new(name: String, field: FieldRef) -> Self {
Self {
name,
field,
spans: Spans::new(),
}
}

pub fn spans_mut(&mut self) -> &mut Spans {
&mut self.spans
}
}

impl Default for Expr {
Expand Down Expand Up @@ -1211,6 +1235,23 @@ impl GroupingSet {
}
}

/// Lambda expression.
#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct Lambda {
pub params: Vec<String>,
pub body: Box<Expr>,
}

impl Lambda {
/// Create a new lambda expression
pub fn new(params: Vec<String>, body: Expr) -> Self {
Self {
params,
body: Box::new(body),
}
}
}

#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
#[cfg(not(feature = "sql"))]
pub struct IlikeSelectItem {
Expand Down Expand Up @@ -1525,6 +1566,8 @@ impl Expr {
#[expect(deprecated)]
Expr::Wildcard { .. } => "Wildcard",
Expr::Unnest { .. } => "Unnest",
Expr::Lambda { .. } => "Lambda",
Expr::LambdaColumn { .. } => "LambdaColumn",
}
}

Expand Down Expand Up @@ -2078,7 +2121,9 @@ impl Expr {
| Expr::Wildcard { .. }
| Expr::WindowFunction(..)
| Expr::Literal(..)
| Expr::Placeholder(..) => false,
| Expr::Placeholder(..)
| Expr::Lambda(..)
| Expr::LambdaColumn(..) => false,
}
}

Expand Down Expand Up @@ -2674,6 +2719,17 @@ impl HashNode for Expr {
column.hash(state);
}
Expr::Unnest(Unnest { expr: _expr }) => {}
Expr::Lambda(Lambda { params, body: _ }) => {
params.hash(state);
}
Expr::LambdaColumn(LambdaColumn {
name,
field,
spans: _,
}) => {
name.hash(state);
field.hash(state);
}
};
}
}
Expand Down Expand Up @@ -2987,6 +3043,12 @@ impl Display for SchemaDisplay<'_> {
}
}
}
Expr::Lambda(Lambda { params, body }) => {
write!(f, "({}) -> {body}", display_comma_separated(params))
}
Expr::LambdaColumn(c) => {
write!(f, "{}", c.name)
}
}
}
}
Expand Down Expand Up @@ -3167,6 +3229,9 @@ impl Display for SqlDisplay<'_> {
}
}
}
Expr::Lambda(Lambda { params, body }) => {
write!(f, "({}) -> {}", params.join(", "), SchemaDisplay(body))
}
_ => write!(f, "{}", self.0),
}
}
Expand Down Expand Up @@ -3474,6 +3539,12 @@ impl Display for Expr {
Expr::Unnest(Unnest { expr }) => {
write!(f, "{UNNEST_COLUMN_PREFIX}({expr})")
}
Expr::Lambda(Lambda { params, body }) => {
write!(f, "({}) -> {body}", params.join(", "))
}
Expr::LambdaColumn(c) => {
write!(f, "{}", c.name)
}
}
}
}
Expand Down
37 changes: 29 additions & 8 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@
use super::{Between, Expr, Like};
use crate::expr::{
AggregateFunction, AggregateFunctionParams, Alias, BinaryExpr, Cast, InList,
InSubquery, Placeholder, ScalarFunction, TryCast, Unnest, WindowFunction,
InSubquery, Lambda, Placeholder, ScalarFunction, TryCast, Unnest, WindowFunction,
WindowFunctionParams,
};
use crate::expr::{FieldMetadata, LambdaColumn};
use crate::type_coercion::functions::{
data_types_with_scalar_udf, fields_with_aggregate_udf, fields_with_window_udf,
fields_with_aggregate_udf, fields_with_window_udf,
};
use crate::{
type_coercion::functions::data_types_with_scalar_udf, udf::ReturnFieldArgs, utils,
LogicalPlan, Projection, Subquery, WindowFunctionDefinition,
};
use arrow::datatypes::FieldRef;
use arrow::{
compute::can_cast_types,
datatypes::{DataType, Field},
};
use crate::udf::ReturnFieldArgs;
use crate::{utils, LogicalPlan, Projection, Subquery, WindowFunctionDefinition};
use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::metadata::FieldMetadata;
use datafusion_common::{
not_impl_err, plan_datafusion_err, plan_err, Column, DataFusionError, ExprSchema,
Result, Spans, TableReference,
Expand Down Expand Up @@ -229,6 +234,10 @@ impl ExprSchemable for Expr {
// Grouping sets do not really have a type and do not appear in projections
Ok(DataType::Null)
}
Expr::Lambda(Lambda { params: _, body }) => body.get_type(schema),
Copy link
Copy Markdown
Contributor

@pepijnve pepijnve Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it an issue that this is technically incorrect? The type of the lambda expression itself is actually a function type, where here it's presented as just its return type. Can this lead to incorrect type analysis?

I haven't thought this through fully, but would it maybe be better to not have lambda functions be an Expr? Lambda functions don't really fit into the arrow type system. You would then have ot use an enum type that's either Expr or Lambda for the higher-order function parameters. Maybe that greatly complicates everything else?

Copy link
Copy Markdown
Contributor Author

@gstvg gstvg Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's either this, DataType::Null or an error. I don't have a strong opinion on this, actually. Right now, the only place I call it is literally like this:

.map(|arg: Expr| {
    let field = arg.to_field(schema)?.1;
        match arg {
            Expr::Lambda(_lambda) => Ok(ValueOrLambda::Lambda(field)),
            _ => Ok(ValueOrLambda::Value(field)),
        }
})

We can just change it to

.map(|arg: Expr| {
        match arg {
            Expr::Lambda(lambda) => Ok(ValueOrLambda::Lambda(lambda.body.to_field(schema)?.1)),
            _ => Ok(ValueOrLambda::Value(arg.to_field(schema)?.1)),
        }
})

During higher order function type coercion, lambda arguments are ignored and only value arguments are checked and coerced so the return type for a lamba is irrelevant.

Some other expressions return DataType::Null like logical Placeholder without a specifed type, Wildcard and GroupingSet and physical NoOp and UnknownColumn. If you think that's better I'm happy to change it

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does seem that the contract of get_type is to return the return type no? (at least going by the boolean expressions that always return bool)? imo I think it's more honest to return DataType::Null than having it return the body type, also because as @pepijnve mentions, there is no actual Arrow type for a lambda. At the call site you'd then be more explicit with handling the body type: lambda.body.to_field(schema)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, done at 96d8ad2, thanks

Expr::LambdaColumn(LambdaColumn { name: _, field, .. }) => {
Ok(field.data_type().clone())
}
}
}

Expand Down Expand Up @@ -347,6 +356,8 @@ impl ExprSchemable for Expr {
// in projections
Ok(true)
}
Expr::Lambda(l) => l.body.nullable(input_schema),
Expr::LambdaColumn(c) => Ok(c.field.is_nullable()),
}
}

Expand Down Expand Up @@ -543,6 +554,7 @@ impl ExprSchemable for Expr {
.into_iter()
.map(|f| (f.data_type().clone(), f))
.unzip();

// Verify that function is invoked with correct number and type of arguments as defined in `TypeSignature`
let new_data_types = data_types_with_scalar_udf(&arg_types, func)
.map_err(|err| {
Expand Down Expand Up @@ -573,9 +585,16 @@ impl ExprSchemable for Expr {
_ => None,
})
.collect::<Vec<_>>();

let lambdas = args
.iter()
.map(|e| matches!(e, Expr::Lambda { .. }))
.collect::<Vec<_>>();

let args = ReturnFieldArgs {
arg_fields: &new_fields,
scalar_arguments: &arguments,
lambdas: &lambdas,
};

func.return_field_from_args(args)
Expand All @@ -600,11 +619,13 @@ impl ExprSchemable for Expr {
| Expr::Wildcard { .. }
| Expr::GroupingSet(_)
| Expr::Placeholder(_)
| Expr::Unnest(_) => Ok(Arc::new(Field::new(
| Expr::Unnest(_)
| Expr::Lambda(_) => Ok(Arc::new(Field::new(
&schema_name,
self.get_type(schema)?,
self.nullable(schema)?,
))),
Expr::LambdaColumn(c) => Ok(Arc::clone(&c.field)),
}?;

Ok((
Expand Down
Loading