Skip to content

Add lambda substrait support#21193

Open
gstvg wants to merge 9 commits intoapache:mainfrom
gstvg:lambda_substrait
Open

Add lambda substrait support#21193
gstvg wants to merge 9 commits intoapache:mainfrom
gstvg:lambda_substrait

Conversation

@gstvg
Copy link
Copy Markdown
Contributor

@gstvg gstvg commented Mar 27, 2026

Which issue does this PR close?

Part of #21172

Rationale for this change

Substrait support wasn't implemented in the core lambda support to reduce PR size

What changes are included in this PR?

Substrait consuming and producing of higher-order functions, lambdas and lambda variables

Are these changes tested?

Unit tests added to datafusion/substrait/tests/cases/roundtrip_logical_plan.rs

Are there any user-facing changes?

Yes, there are breaking changes, new methods without default implementation have been added to SubstraitConsumer and SubstraitProducer

@github-actions github-actions Bot added sql SQL Planner logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate catalog Related to the catalog crate common Related to common crate execution Related to the execution crate proto Related to proto crate functions Changes to functions implementation datasource Changes to the datasource crate ffi Changes to the ffi crate labels Mar 27, 2026
@gstvg gstvg mentioned this pull request Mar 27, 2026
23 tasks
@benbellick
Copy link
Copy Markdown
Contributor

👋 Hello from @substrait-io. Great to see the core lambda PR has gotten through! Once this PR is in a ready to review state and is rebased off of main, I will be more than happy to help review it 🙂

@gstvg
Copy link
Copy Markdown
Contributor Author

gstvg commented Apr 29, 2026

Thanks @benbellick, I will open this tonight. Besides rebasing, I believe it misses some tests (I tested with sqllogictests only)

@github-actions github-actions Bot added documentation Improvements or additions to documentation spark and removed documentation Improvements or additions to documentation sql SQL Planner logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) catalog Related to the catalog crate common Related to common crate execution Related to the execution crate proto Related to proto crate functions Changes to functions implementation labels Apr 29, 2026
@github-actions github-actions Bot removed datasource Changes to the datasource crate ffi Changes to the ffi crate spark labels Apr 29, 2026
@gstvg gstvg force-pushed the lambda_substrait branch from dc5e115 to 926b7e8 Compare April 29, 2026 23:37
@gstvg gstvg force-pushed the lambda_substrait branch from 926b7e8 to df1b740 Compare April 30, 2026 02:20
@github-actions github-actions Bot removed the core Core DataFusion crate label Apr 30, 2026
@gstvg gstvg marked this pull request as ready for review April 30, 2026 03:56
@gstvg
Copy link
Copy Markdown
Contributor Author

gstvg commented Apr 30, 2026

@benbellick this is ready for review. Failing CI correctly detects breaking changes but apparently fails to create a comment here with the changes summary

@benbellick
Copy link
Copy Markdown
Contributor

Great! I will try and find some time to take a look tomorrow :)

Thanks for working on this!

@gstvg gstvg changed the title [DRAFT] Add lambda substrait support Add lambda substrait support Apr 30, 2026
@rluvaton
Copy link
Copy Markdown
Member

I updated your branch with main since the fix for detect breaking changes was resolved now, sorry for the trouble

@rluvaton rluvaton added the api change Changes the API exposed to users of the crate label Apr 30, 2026
Copy link
Copy Markdown
Contributor

@benbellick benbellick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It overall looks good to me! I left a few comments on some stylistic things but the only thing that I would particularly like to see is just the tests for consumer / producer independently. Thanks!

Comment thread datafusion/substrait/src/logical_plan/producer/expr/scalar_function.rs Outdated
/// Default implementation of lambda related methods of the [SubstraitConsumer] trait
///
/// Can be embedded into a custom [SubstraitConsumer] to implement them
pub struct DefaultSubstraitLambdaConsumer {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this is public? This feels like an implementation detail of the default lambda-handling logic. What about:

Suggested change
pub struct DefaultSubstraitLambdaConsumer {
struct LambdaConsumerState {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing required methods for trait SubstraitConsumer are trivial to implement, but that's not the case for the newly added lambda methods. This is a just a convenience to custom implementations which don't want to customize the default lambda handling, should I remove it?

struct CustomSubstraitConsumer {
     extensions: Arc<Extensions>,
     state: Arc<SessionState>,
     // You can reuse existing consumer code related to lambdas
     lambda_consumer: DefaultSubstraitLambdaConsumer,
 }

 #[async_trait]
 impl SubstraitConsumer for CustomSubstraitConsumer {
     async fn resolve_table_ref(
         &self,
         table_ref: &TableReference,
     ) -> Result<Option<Arc<dyn TableProvider>>> {
         let table = table_ref.table().to_string();
         let schema = self.state.schema_for_ref(table_ref.clone())?;
         let table_provider = schema.table(&table).await?;
         Ok(table_provider)
     }

     fn get_extensions(&self) -> &Extensions {
         self.extensions.as_ref()
     }

     fn get_function_registry(&self) -> &impl FunctionRegistry {
         self.state.as_ref()
     }

     fn with_lambda_parameters(
        &self,
        lambda_parameters: &[Type],
        input_schema: &DFSchema,
    ) -> datafusion::common::Result<(Vec<String>, Self)> {
        let (names, lambda_consumer) = self.lambda_consumer.with_lambda_parameters(
            self,
            lambda_parameters,
            input_schema,
        )?;

        Ok((
            names,
            Self {
                extensions: self.extensions.clone(),
                state: self.state.clone(),
                lambda_consumer,
            },
        ))
    }

    fn lambda_variable(
        &self,
        steps_out: usize,
        field_idx: usize,
    ) -> datafusion::common::Result<Expr> {
        self.lambda_consumer.lambda_variable(steps_out, field_idx)
    }
 }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see now, I appreciate the explanation. That makes a lot of sense then!

AFAICT the strategy for outer schemas is to provide default impls so that consumer implementors who don't care about the behavior can ignore their existence, but then will encounter a runtime error if they are used:

/// Push an outer schema onto the stack when entering a subquery.
fn push_outer_schema(&self, _schema: Arc<DFSchema>) {}
/// Pop an outer schema from the stack when leaving a subquery.
fn pop_outer_schema(&self) {}
/// Get the outer schema at the given nesting depth.
/// `steps_out = 1` is the immediately enclosing query, `steps_out = 2`
/// is two levels out, etc. Returns `None` if `steps_out` is 0 or
/// exceeds the current nesting depth (the caller should treat this as
/// an error in the Substrait plan).
fn get_outer_schema(&self, _steps_out: usize) -> Option<Arc<DFSchema>> {
None

I'm wondering if we should do the same thing for lambdas? Rather than enforce that implementors must implement the lambda-handling fns, they could instead optionally ignore them, resulting in an error if lambda expressions are encountered.

What do you think? I am not so particular here TBH. Ultimately my goal is to validate that the translation itself is correct, and I am happy to leave API concerns to the project maintainers :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I agree, they definetively should have a default impl returning an error to avoid a breaking change, thank you 2c23439
But even then, I still believe a public DefaultSubstraitLambdaConsumer is convenient, but yes, let's leave that for that maintainers, thanks

Comment thread datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs Outdated
Comment thread datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs Outdated
Comment thread datafusion/substrait/src/logical_plan/consumer/expr/scalar_function.rs Outdated
/// Default implementation of lambda related methods of the [SubstraitProducer] trait
///
/// Can be embedded into a custom [SubstraitProducer] to implement them
pub struct DefaultSubstraitLambdaProducer {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as on the consumer side. I wonder if we can just keep this private, since its usage in implementing this producer is an implementation detail.

Copy link
Copy Markdown
Contributor Author

@gstvg gstvg May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason for the consumer. What get's decided there (#21193 (comment)) I'll also apply here

struct CustomSubstraitProducer {
    extensions: Extensions,
    state: Arc<SessionState>,
    // You can reuse existing producer code related to lambdas
    lambda_producer: DefaultSubstraitLambdaProducer,
}

impl SubstraitProducer for CustomSubstraitProducer {
    fn register_function(&mut self, signature: String) -> u32 {
        self.extensions.register_function(&signature)
    }

    fn register_type(&mut self, type_name: String) -> u32 {
        self.extensions.register_type(&type_name)
    }

    fn get_extensions(self) -> Extensions {
        self.extensions
    }

    fn push_lambda_parameters(
        &mut self,
        lambda_parameters: Vec<FieldRef>,
    ) -> datafusion::common::Result<()> {
        let lambda_parameters_map = lambda_parameters_map(self, lambda_parameters)?;

        self.lambda_producer
            .push_lambda_parameters(lambda_parameters_map);

        Ok(())
    }

    fn pop_lambda_parameters(&mut self) -> datafusion::common::Result<()> {
        self.lambda_producer.pop_lambda_parameters()
    }

    fn lambda_variable(&self, name: &str) -> datafusion::common::Result<(u32, i32)> {
        self.lambda_producer.lambda_variable(name)
    }

    fn lambda_parameter_type(
        &self,
        name: &str,
    ) -> datafusion::common::Result<substrait::proto::Type> {
        self.lambda_producer.lambda_parameter_type(name)
    }
}

Comment thread datafusion/substrait/src/logical_plan/producer/expr/scalar_function.rs Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are great!

One additional thing that might be useful is a small number of tests that exercise the producer and consumer independently. The roundtrip tests verify that the producer and consumer are internally consistent with each other, but they don’t make it as obvious what Substrait representation we expect to support.

There is some precedent for both styles:

  • Consumer-side tests that load Substrait JSON and convert it to a DataFusion plan:
    "tests/testdata/test_plans/emit_kind/direct_on_project.substrait.json",
  • Producer-side tests that call to_substrait_plan and inspect the generated proto:
    let plan = to_substrait_plan(&datafusion_plan, &ctx.state())?
    .as_ref()
    .clone();
    let relation = plan.relations.first().unwrap().rel_type.as_ref();
    let root_rel = match relation {
    Some(RelType::Root(root)) => root.input.as_ref().unwrap(),
    _ => panic!("expected Root"),
    };
    if let Some(rel::RelType::Project(p)) = root_rel.rel_type.as_ref() {
    // The input has 2 columns [a, b], the Projection has 3 expressions [b, a + a, a]
    // The required output mapping is [2,3,4], which skips the 2 input columns.
    assert_emit(p.common.as_ref(), vec![2, 3, 4]);

It might be nice to add one or two similar tests for lambdas, so the expected Substrait shape for Lambda / LambdaParameterReference is documented by the tests.

Copy link
Copy Markdown
Contributor Author

@gstvg gstvg May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Added one consumer test at 1fc0f05 and one producer test at 6cdfc05, both using an expression with should cover all edge cases (nesting, multiple parameters, [un]shadowing, column name conflict and outer references)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benbellick From the substrait translation point of view, do you think these tests are enough, so we can ping those who also reviewed other lambda PR's?

@github-actions github-actions Bot added the auto detected api change Auto detected API change label May 8, 2026
Comment on lines +532 to +539
/// Returns a new instance of this consumer which includes the given `lambda_parameters` and the names they got assigned
///
/// Note for custom implementations it's possible to embed a [DefaultSubstraitLambdaConsumer] and forward this method to it
fn with_lambda_parameters(
&self,
lambda_parameters: &[Type],
input_schema: &DFSchema,
) -> datafusion::common::Result<(Vec<String>, Self)>;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benbellick I usually follows the pattern of existing methods, like push/pop_outer_schema from #20439. This is my first time dealing with substrait, so I may be wrong, but I didn't followed this pattern (using push/pop_lambda_parameter) because it modifies &self via a RwLock and I'm note sure this couldn't lead to conflicts if the same consumer is used to consume two different plans at the same time in different threads. If that's not the case I can change this to use push/pop_lambda_parameter as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that is an interesting point... Is it expected/supported for the same SubstraitConsumer instance to be used concurrently?

If not, then I think it would be simpler and consistent to model lambda scope the same way as outer schemas.

If yes, then the scoped-consumer approach here makes sense, but it seems like the existing push_outer_schema / pop_outer_schema stack may have the same interleaving issue and should probably be addressed separately.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll confirm with the maintainer who ends up reviewing this, but SubstraitConsumer is both Send + Sync and all it's methods take &self. Since the default consumer is cheap to create, and I expect most/all custom ones to be cheap as well, I guess it's mostly due to async and to easily embed it into other structures which should also implement Send + Sync than to allow efficient concurrent usage.
I won't expect any consumer to be used concurrently for performance, but, since it can be used, I think it's possible that it's/will be used concurrently incidentally as the easier/natural way within a given codebase

Comment thread datafusion/substrait/tests/cases/roundtrip_logical_plan.rs Outdated
@github-actions github-actions Bot removed the auto detected api change Auto detected API change label May 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api change Changes the API exposed to users of the crate substrait Changes to the substrait crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants