Add lambda substrait support#21193
Conversation
|
👋 Hello from @substrait-io. Great to see the core lambda PR has gotten through! Once this PR is in a ready to review state and is rebased off of main, I will be more than happy to help review it 🙂 |
|
Thanks @benbellick, I will open this tonight. Besides rebasing, I believe it misses some tests (I tested with sqllogictests only) |
|
@benbellick this is ready for review. Failing CI correctly detects breaking changes but apparently fails to create a comment here with the changes summary |
|
Great! I will try and find some time to take a look tomorrow :) Thanks for working on this! |
|
I updated your branch with main since the fix for detect breaking changes was resolved now, sorry for the trouble |
benbellick
left a comment
There was a problem hiding this comment.
It overall looks good to me! I left a few comments on some stylistic things but the only thing that I would particularly like to see is just the tests for consumer / producer independently. Thanks!
| /// Default implementation of lambda related methods of the [SubstraitConsumer] trait | ||
| /// | ||
| /// Can be embedded into a custom [SubstraitConsumer] to implement them | ||
| pub struct DefaultSubstraitLambdaConsumer { |
There was a problem hiding this comment.
Is there a reason this is public? This feels like an implementation detail of the default lambda-handling logic. What about:
| pub struct DefaultSubstraitLambdaConsumer { | |
| struct LambdaConsumerState { |
There was a problem hiding this comment.
The existing required methods for trait SubstraitConsumer are trivial to implement, but that's not the case for the newly added lambda methods. This is a just a convenience to custom implementations which don't want to customize the default lambda handling, should I remove it?
struct CustomSubstraitConsumer {
extensions: Arc<Extensions>,
state: Arc<SessionState>,
// You can reuse existing consumer code related to lambdas
lambda_consumer: DefaultSubstraitLambdaConsumer,
}
#[async_trait]
impl SubstraitConsumer for CustomSubstraitConsumer {
async fn resolve_table_ref(
&self,
table_ref: &TableReference,
) -> Result<Option<Arc<dyn TableProvider>>> {
let table = table_ref.table().to_string();
let schema = self.state.schema_for_ref(table_ref.clone())?;
let table_provider = schema.table(&table).await?;
Ok(table_provider)
}
fn get_extensions(&self) -> &Extensions {
self.extensions.as_ref()
}
fn get_function_registry(&self) -> &impl FunctionRegistry {
self.state.as_ref()
}
fn with_lambda_parameters(
&self,
lambda_parameters: &[Type],
input_schema: &DFSchema,
) -> datafusion::common::Result<(Vec<String>, Self)> {
let (names, lambda_consumer) = self.lambda_consumer.with_lambda_parameters(
self,
lambda_parameters,
input_schema,
)?;
Ok((
names,
Self {
extensions: self.extensions.clone(),
state: self.state.clone(),
lambda_consumer,
},
))
}
fn lambda_variable(
&self,
steps_out: usize,
field_idx: usize,
) -> datafusion::common::Result<Expr> {
self.lambda_consumer.lambda_variable(steps_out, field_idx)
}
}There was a problem hiding this comment.
Ah I see now, I appreciate the explanation. That makes a lot of sense then!
AFAICT the strategy for outer schemas is to provide default impls so that consumer implementors who don't care about the behavior can ignore their existence, but then will encounter a runtime error if they are used:
I'm wondering if we should do the same thing for lambdas? Rather than enforce that implementors must implement the lambda-handling fns, they could instead optionally ignore them, resulting in an error if lambda expressions are encountered.
What do you think? I am not so particular here TBH. Ultimately my goal is to validate that the translation itself is correct, and I am happy to leave API concerns to the project maintainers :)
There was a problem hiding this comment.
Ah, I agree, they definetively should have a default impl returning an error to avoid a breaking change, thank you 2c23439
But even then, I still believe a public DefaultSubstraitLambdaConsumer is convenient, but yes, let's leave that for that maintainers, thanks
| /// Default implementation of lambda related methods of the [SubstraitProducer] trait | ||
| /// | ||
| /// Can be embedded into a custom [SubstraitProducer] to implement them | ||
| pub struct DefaultSubstraitLambdaProducer { |
There was a problem hiding this comment.
Same comment as on the consumer side. I wonder if we can just keep this private, since its usage in implementing this producer is an implementation detail.
There was a problem hiding this comment.
Same reason for the consumer. What get's decided there (#21193 (comment)) I'll also apply here
struct CustomSubstraitProducer {
extensions: Extensions,
state: Arc<SessionState>,
// You can reuse existing producer code related to lambdas
lambda_producer: DefaultSubstraitLambdaProducer,
}
impl SubstraitProducer for CustomSubstraitProducer {
fn register_function(&mut self, signature: String) -> u32 {
self.extensions.register_function(&signature)
}
fn register_type(&mut self, type_name: String) -> u32 {
self.extensions.register_type(&type_name)
}
fn get_extensions(self) -> Extensions {
self.extensions
}
fn push_lambda_parameters(
&mut self,
lambda_parameters: Vec<FieldRef>,
) -> datafusion::common::Result<()> {
let lambda_parameters_map = lambda_parameters_map(self, lambda_parameters)?;
self.lambda_producer
.push_lambda_parameters(lambda_parameters_map);
Ok(())
}
fn pop_lambda_parameters(&mut self) -> datafusion::common::Result<()> {
self.lambda_producer.pop_lambda_parameters()
}
fn lambda_variable(&self, name: &str) -> datafusion::common::Result<(u32, i32)> {
self.lambda_producer.lambda_variable(name)
}
fn lambda_parameter_type(
&self,
name: &str,
) -> datafusion::common::Result<substrait::proto::Type> {
self.lambda_producer.lambda_parameter_type(name)
}
}There was a problem hiding this comment.
These tests are great!
One additional thing that might be useful is a small number of tests that exercise the producer and consumer independently. The roundtrip tests verify that the producer and consumer are internally consistent with each other, but they don’t make it as obvious what Substrait representation we expect to support.
There is some precedent for both styles:
- Consumer-side tests that load Substrait JSON and convert it to a DataFusion plan:
- Producer-side tests that call
to_substrait_planand inspect the generated proto:datafusion/datafusion/substrait/tests/cases/serialize.rs
Lines 114 to 126 in fa9ada3
It might be nice to add one or two similar tests for lambdas, so the expected Substrait shape for Lambda / LambdaParameterReference is documented by the tests.
There was a problem hiding this comment.
@benbellick From the substrait translation point of view, do you think these tests are enough, so we can ping those who also reviewed other lambda PR's?
| /// Returns a new instance of this consumer which includes the given `lambda_parameters` and the names they got assigned | ||
| /// | ||
| /// Note for custom implementations it's possible to embed a [DefaultSubstraitLambdaConsumer] and forward this method to it | ||
| fn with_lambda_parameters( | ||
| &self, | ||
| lambda_parameters: &[Type], | ||
| input_schema: &DFSchema, | ||
| ) -> datafusion::common::Result<(Vec<String>, Self)>; |
There was a problem hiding this comment.
@benbellick I usually follows the pattern of existing methods, like push/pop_outer_schema from #20439. This is my first time dealing with substrait, so I may be wrong, but I didn't followed this pattern (using push/pop_lambda_parameter) because it modifies &self via a RwLock and I'm note sure this couldn't lead to conflicts if the same consumer is used to consume two different plans at the same time in different threads. If that's not the case I can change this to use push/pop_lambda_parameter as well.
There was a problem hiding this comment.
Ah, that is an interesting point... Is it expected/supported for the same SubstraitConsumer instance to be used concurrently?
If not, then I think it would be simpler and consistent to model lambda scope the same way as outer schemas.
If yes, then the scoped-consumer approach here makes sense, but it seems like the existing push_outer_schema / pop_outer_schema stack may have the same interleaving issue and should probably be addressed separately.
There was a problem hiding this comment.
I'll confirm with the maintainer who ends up reviewing this, but SubstraitConsumer is both Send + Sync and all it's methods take &self. Since the default consumer is cheap to create, and I expect most/all custom ones to be cheap as well, I guess it's mostly due to async and to easily embed it into other structures which should also implement Send + Sync than to allow efficient concurrent usage.
I won't expect any consumer to be used concurrently for performance, but, since it can be used, I think it's possible that it's/will be used concurrently incidentally as the easier/natural way within a given codebase
Which issue does this PR close?
Part of #21172
Rationale for this change
Substrait support wasn't implemented in the core lambda support to reduce PR size
What changes are included in this PR?
Substrait consuming and producing of higher-order functions, lambdas and lambda variables
Are these changes tested?
Unit tests added to
datafusion/substrait/tests/cases/roundtrip_logical_plan.rsAre there any user-facing changes?
Yes, there are breaking changes, new methods without default implementation have been added to
SubstraitConsumerandSubstraitProducer