proto: serialize dynamic filters on Sort, Aggregate, HashJoin plan nodes#22011
proto: serialize dynamic filters on Sort, Aggregate, HashJoin plan nodes#22011jayshrivastava wants to merge 3 commits intoapache:mainfrom
Conversation
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
Builds on the prior `DynamicFilterPhysicalExpr` proto serialization + dedupe work so plan-node references to a shared dynamic filter survive roundtrip. - Adds `dynamic_filter` to the proto messages for `SortExec`, `AggregateExec`, and `HashJoinExec` and wires it through to/from-proto. - Exposes `dynamic_filter()` / `with_dynamic_filter()` on those plan nodes so the dedupe deserializer can reattach the shared `DynamicFilterPhysicalExpr` after roundtrip. - Extracts `supported_accumulators_info()` on `AggregateExec` and uses it from `init_dynamic_filter` and `with_dynamic_filter`. - Adds `test_hash_join_with_dynamic_filter_roundtrip`, `test_aggregate_with_dynamic_filter_roundtrip`, and `test_sort_topk_with_dynamic_filter_roundtrip` to verify that the plan node and the pushdown-target `ParquetSource` predicate end up pointing at the same `expression_id` after roundtrip. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
We may alternatively use I'll leave it up to reviewers for thoughts on this. This is the We currently expose all these expressions via public methods I continued this pattern by adding
If folks would still prefer using |
0f5ea55 to
334ca91
Compare
|
cc @adriangb @LiaCastaneda let me know what you think about this one! (Lia is on vacation this week but I tagged her for when she's back) |
stuhood
left a comment
There was a problem hiding this comment.
Reviewed everything but aggregates (don't know anything about how filters are used there), and this looks good to me. Thanks!
| if let Ok(df) = (dynamic_filter_expr as Arc<dyn Any + Send + Sync>) | ||
| .downcast::<DynamicFilterPhysicalExpr>() | ||
| { |
There was a problem hiding this comment.
Should this error if it deserializes something unexpected? Ditto elsewhere.
There was a problem hiding this comment.
TYFR! Sorry I missed the @ btw. Will make sure to tag you next time 😄
There was a problem hiding this comment.
Hmm yeah let's error. I'll push an update
|
Argh I hate that we keep having to leak more and more internal state to the public API just for serialization. Maybe we should just plow on as is but I think we as a community seriously have to consider #21949. We should discuss next sync. |
Which issue does this PR close?
Rationale for this change
SortExec,AggregateExec, andHashJoinExecdo not serialize their dynamic filters, so plans lose dynamic filtering when they are serialized and sent across network boundaries.What changes are included in this PR?
This change adds
with_dynamic_filter_expr()anddynamic_filter_expr()toSortExec,AggregateExec, andHashJoinExec.This are used as getters and setters for the
protocrate to get and set dynamic filters.Are these changes tested?
Yes. See
datafusion/datafusion/proto/tests/cases/roundtrip_physical_plan.rs. There are also tests for the plan nodes in thephysical-plancrate.Are there any user-facing changes?
SortExec,AggregateExec, andHashJoinExecnow roundtrip serialize dynamic filter expressions.