Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
df1b740
feat: add substrait lambda support
gstvg Apr 29, 2026
809d5db
Merge branch 'main' into lambda_substrait
rluvaton Apr 30, 2026
feace02
add some comments
gstvg Apr 30, 2026
35e3bcf
Merge branch 'main' of https://github.com/apache/datafusion into lamb…
gstvg May 7, 2026
9b6a8e1
address pr review
gstvg May 8, 2026
a153093
improve roundtrip tests
gstvg May 8, 2026
1fc0f05
add consumer test
gstvg May 8, 2026
6cdfc05
add producer test
gstvg May 8, 2026
2c23439
add default impl to new lambda methods
gstvg May 9, 2026
3d83fb4
Apply suggestion from @benbellick
gstvg May 11, 2026
83e261f
improve serialize test readability
gstvg May 11, 2026
c019769
add lambda field reference tests
gstvg May 11, 2026
93f17e4
add lambda tests
gstvg May 11, 2026
347bf12
check invalid lambda
gstvg May 11, 2026
59fb308
add comment to collect_lambda_ref test helper
gstvg May 14, 2026
e9a9441
Merge branch 'main' of https://github.com/gstvg/arrow-datafusion into…
gstvg May 14, 2026
898bb21
fix for new sqlparser version
gstvg May 14, 2026
d78d49c
remove derive(Clone) from Extensions
gstvg May 19, 2026
ad22039
Merge branch 'main' into lambda_substrait
gstvg May 19, 2026
44384dd
use push/pop_lambda_parameters instead of with_lambda_parameters
gstvg May 19, 2026
bee313a
fix doc comment
gstvg May 19, 2026
555a49f
revert submodule bump
gstvg May 19, 2026
0cbdec7
Merge branch 'main' of https://github.com/apache/datafusion into lamb…
gstvg May 26, 2026
b737f69
use new session ctx methods instead of FuctionRegistry trait
gstvg May 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/substrait/src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use substrait::proto::extensions::simple_extension_declaration::{
/// types. This structs facilitates the use of these extensions in DataFusion.
/// TODO: DF doesn't yet use extensions for type variations <https://github.com/apache/datafusion/issues/11544>
/// TODO: DF doesn't yet provide valid extensionUris <https://github.com/apache/datafusion/issues/11545>
#[derive(Default, Debug, PartialEq)]
#[derive(Clone, Default, Debug, PartialEq)]
Comment thread
gabotechs marked this conversation as resolved.
Outdated
pub struct Extensions {
pub functions: HashMap<u32, String>, // anchor -> function name
pub types: HashMap<u32, String>, // anchor -> type name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use datafusion::logical_expr::Expr;
use std::sync::Arc;
use substrait::proto::expression::FieldReference;
use substrait::proto::expression::field_reference::ReferenceType::DirectReference;
use substrait::proto::expression::field_reference::RootType;
use substrait::proto::expression::field_reference::{LambdaParameterReference, RootType};
use substrait::proto::expression::reference_segment::ReferenceType::StructField;

pub async fn from_field_reference(
Expand Down Expand Up @@ -56,9 +56,9 @@ pub(crate) fn from_substrait_field_reference(
Some(RootType::Expression(_)) => not_impl_err!(
"Expression root type in field reference is not supported"
),
Some(RootType::LambdaParameterReference(_)) => not_impl_err!(
"Lambda parameter reference in field reference is not yet supported"
),
Some(RootType::LambdaParameterReference(
LambdaParameterReference { steps_out },
)) => consumer.lambda_variable(*steps_out as usize, field_idx),
}
}
_ => not_impl_err!(
Expand Down
47 changes: 47 additions & 0 deletions datafusion/substrait/src/logical_plan/consumer/expr/lambda.rs
Comment thread
gstvg marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::{
common::{DFSchema, substrait_err},
prelude::{Expr, lambda},
};
use substrait::proto;

use crate::logical_plan::consumer::SubstraitConsumer;

pub async fn from_lambda(
consumer: &impl SubstraitConsumer,
expr: &proto::expression::Lambda,
input_schema: &DFSchema,
) -> datafusion::common::Result<Expr> {
let Some(parameters) = expr.parameters.as_ref() else {
return substrait_err!("Lambda expression without parameters is not allowed");
};

let (names, consumer_with_parameters) =
consumer.with_lambda_parameters(&parameters.types, input_schema)?;

let Some(body) = expr.body.as_ref() else {
return substrait_err!("Lambda expression without body is not allowed");
};

let body = consumer_with_parameters
.consume_expression(body, input_schema)
.await?;

Ok(lambda(names, body))
}
9 changes: 7 additions & 2 deletions datafusion/substrait/src/logical_plan/consumer/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod cast;
mod field_reference;
mod function_arguments;
mod if_then;
mod lambda;
mod literal;
mod nested;
mod scalar_function;
Expand All @@ -32,6 +33,7 @@ pub use cast::*;
pub use field_reference::*;
pub use function_arguments::*;
pub use if_then::*;
pub use lambda::*;
pub use literal::*;
pub use nested::*;
pub use scalar_function::*;
Expand Down Expand Up @@ -95,8 +97,11 @@ pub async fn from_substrait_rex(
RexType::DynamicParameter(expr) => {
consumer.consume_dynamic_parameter(expr, input_schema).await
}
RexType::Lambda(_) | RexType::LambdaInvocation(_) => {
not_impl_err!("Lambda expressions are not yet supported")
RexType::Lambda(lambda) => {
consumer.consume_lambda(lambda.as_ref(), input_schema).await
}
RexType::LambdaInvocation(_) => {
not_impl_err!("Lambda invocations are not supported")
}
},
None => substrait_err!("Expression must set rex_type: {expression:?}"),
Expand Down
Comment thread
gstvg marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub async fn from_scalar_function(
f: &ScalarFunction,
input_schema: &DFSchema,
) -> Result<Expr> {
//TODO: handle higher order functions, as they are also encoded as scalar functions
let Some(fn_signature) = consumer
.get_extensions()
.functions
Expand All @@ -45,6 +44,20 @@ pub async fn from_scalar_function(
let fn_name = substrait_fun_name(fn_signature);
let args = from_substrait_func_args(consumer, &f.arguments, input_schema).await?;

let higher_order_func = consumer
.get_function_registry()
.higher_order_function(fn_name)
.or_else(|e| {
if let Some(alt_name) = substrait_to_df_name(fn_name) {
consumer
.get_function_registry()
.higher_order_function(alt_name)
.or(Err(e))
} else {
Err(e)
}
});

let udf_func = consumer.get_function_registry().udf(fn_name).or_else(|e| {
if let Some(alt_name) = substrait_to_df_name(fn_name) {
consumer.get_function_registry().udf(alt_name).or(Err(e))
Expand All @@ -53,9 +66,14 @@ pub async fn from_scalar_function(
}
});

// try to first match the requested function into registered udfs, then built-in ops
// try to first match the requested function into registered higher-order functions, then udfs, built-in ops
// and finally built-in expressions
if let Ok(func) = udf_func {
if let Ok(func) = higher_order_func {
Ok(Expr::HigherOrderFunction(expr::HigherOrderFunction::new(
func.to_owned(),
args,
)))
} else if let Ok(func) = udf_func {
Ok(Expr::ScalarFunction(expr::ScalarFunction::new_udf(
func.to_owned(),
args,
Expand Down
Loading