Skip to content

Commit 9962d27

Browse files
adriangbclaude
andauthored
Split proto serialization to encapsulate private state (#21835) (#21929)
## Which issue does this PR close? - Closes #21835. ## Rationale for this change `datafusion-proto` serializes every built-in `PhysicalExpr` through a single ~300-line `downcast_ref` chain, with a mirror `match` on the decode side. That chain lives outside the crate where each expression is defined, so every field an expression wants to round-trip has to be made `pub`. #21807 is the cautionary tale: it had to add five `pub` "proto-only, not stable" items to `DynamicFilterPhysicalExpr` just to serialize an `RwLock`-wrapped inner. This PR adds the infrastructure so a `PhysicalExpr` can serialize itself and keep its state private. ## What changes are included in this PR? A `PhysicalExpr` can now opt into serializing itself, in both directions: ```rust fn try_to_proto(&self, ctx: &PhysicalExprEncodeCtx) -> Result<Option<PhysicalExprNode>> fn try_from_proto(node: &PhysicalExprNode, ctx: &PhysicalExprDecodeCtx) -> Result<Arc<dyn PhysicalExpr>> ``` `try_to_proto` returning `Ok(None)` (the default) means "fall through to the old downcast chain", so the change is purely additive — nothing is forced to migrate. `Column` and `BinaryExpr` are migrated as working demos; everything else stays on the old path and migrates later, one expression at a time, with no wire-format change. Five stacked commits, each builds green on its own and is independently reviewable (or splittable into its own PR): 1. **Extract `datafusion-proto-models` crate** — move the `.proto` file and prost-generated types into a lightweight crate (mirrors the existing `datafusion-proto-common` split). 2. **Add the `try_to_proto` hook** — feature-gated, off by default. 3. **Migrate `Column` encode.** 4. **Add the decode side and migrate `Column` decode.** 5. **Migrate `BinaryExpr`** (both directions). ## A few design decisions worth flagging - **`FromProto` / `TryFromProto` traits instead of plain `From` / `TryFrom`.** Once the prost types move into their own crate they are *foreign* to `datafusion-proto`, and the orphan rule forbids `impl From<&protobuf::X> for Y` when both `X` and `Y` are foreign. So those conversions become `FromProto` / `TryFromProto` traits in `datafusion_proto::convert`, and callers go from `(&x).into()` to `Y::from_proto(&x)`. This is a known workaround, not the end state — see Future work. - **The ctx is a concrete struct, not `&dyn`.** `PhysicalExprEncodeCtx` / `PhysicalExprDecodeCtx` wrap a sealed dispatch trait. Keeping them concrete keeps `&dyn` out of every expression's signature and gives a stable place to add helpers (UDF encoding, registry hooks) later without churning a public trait. - **`try_from_proto` takes the whole `PhysicalExprNode`**, not the pre-unwrapped variant payload, so every expression's decoder has the same signature and can still see outer-node fields like `expr_id`. ## Are these changes tested? No new behavior, so no new tests. `Column` and `BinaryExpr` produce and consume the same wire format as before; the existing `roundtrip_physical_plan` / `roundtrip_physical_expr` tests already cover both directions and now exercise the new path. ## Are there any user-facing changes? Small API breaks in `datafusion-proto`: - `try_from_physical_plan_with_converter` / `try_into_physical_plan_with_converter` move to a `PhysicalPlanNodeExt` trait — callers add `use datafusion_proto::physical_plan::PhysicalPlanNodeExt;`. - Foreign-foreign `From` / `TryFrom` conversions become `FromProto` / `TryFromProto` (see Design decisions above). - `datafusion_proto::generated::*` is deprecated in favor of `datafusion_proto::protobuf`; it still works. The new `proto` feature on `datafusion-physical-expr(-common)` is off by default, so crates that don't serialize plans pay nothing. ## Future work - Migrate the remaining built-in expressions — including `DynamicFilterPhysicalExpr`, the original motivation — one per follow-up PR. - Apply the same pattern to `ExecutionPlan` serialization. - Drop the `FromProto` / `TryFromProto` workaround: collapse `datafusion-proto-common` into `datafusion-proto-models` and push the conversion impls down to the target-type crates so callers use plain `From` / `TryFrom` again. Full dep-graph analysis and a step-by-step plan are in [#21835 (comment)](#21835 (comment)). 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 50d74a7 commit 9962d27

40 files changed

Lines changed: 1348 additions & 429 deletions

File tree

.github/workflows/rust.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,6 @@ jobs:
605605
rust-version: stable
606606
- name: Run
607607
run: |
608-
echo '' > datafusion/proto/src/generated/datafusion.rs
609608
ci/scripts/rust_fmt.sh
610609
611610
# Coverage job disabled due to

Cargo.lock

Lines changed: 17 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,10 @@ members = [
4747
"datafusion/pruning",
4848
"datafusion/physical-plan",
4949
"datafusion/proto",
50-
"datafusion/proto/gen",
5150
"datafusion/proto-common",
5251
"datafusion/proto-common/gen",
52+
"datafusion/proto-models",
53+
"datafusion/proto-models/gen",
5354
"datafusion/session",
5455
"datafusion/spark",
5556
"datafusion/sql",
@@ -152,6 +153,7 @@ datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", versio
152153
datafusion-physical-plan = { path = "datafusion/physical-plan", version = "53.1.0" }
153154
datafusion-proto = { path = "datafusion/proto", version = "53.1.0" }
154155
datafusion-proto-common = { path = "datafusion/proto-common", version = "53.1.0" }
156+
datafusion-proto-models = { path = "datafusion/proto-models", version = "53.1.0" }
155157
datafusion-pruning = { path = "datafusion/pruning", version = "53.1.0" }
156158
datafusion-session = { path = "datafusion/session", version = "53.1.0" }
157159
datafusion-spark = { path = "datafusion/spark", version = "53.1.0" }

datafusion-examples/examples/custom_data_source/adapter_serialization.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ use datafusion_proto::bytes::{
6262
use datafusion_proto::physical_plan::from_proto::parse_physical_expr_with_converter;
6363
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr_with_converter;
6464
use datafusion_proto::physical_plan::{
65-
PhysicalExtensionCodec, PhysicalPlanDecodeContext, PhysicalProtoConverterExtension,
65+
PhysicalExtensionCodec, PhysicalPlanDecodeContext, PhysicalPlanNodeExt,
66+
PhysicalProtoConverterExtension,
6667
};
6768
use datafusion_proto::protobuf::physical_plan_node::PhysicalPlanType;
6869
use datafusion_proto::protobuf::{

datafusion-examples/examples/proto/expression_deduplication.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use datafusion_proto::physical_plan::from_proto::parse_physical_expr_with_conver
5252
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr_with_converter;
5353
use datafusion_proto::physical_plan::{
5454
DefaultPhysicalExtensionCodec, PhysicalExtensionCodec, PhysicalPlanDecodeContext,
55-
PhysicalProtoConverterExtension,
55+
PhysicalPlanNodeExt, PhysicalProtoConverterExtension,
5656
};
5757
use datafusion_proto::protobuf::{PhysicalExprNode, PhysicalPlanNode};
5858
use prost::Message;

datafusion/expr-common/src/operator.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,50 @@ impl Operator {
390390
| Operator::StringConcat => false,
391391
}
392392
}
393+
394+
/// Parse an `Operator` from the string name `datafusion-proto` uses on the
395+
/// wire (the `Debug` name of the variant, e.g. `"Eq"`).
396+
///
397+
/// Returns `None` for names with no binary-operator counterpart. This is
398+
/// the canonical proto-string mapping, shared by `datafusion-proto`
399+
/// (logical plans) and `PhysicalExpr` decoders such as `BinaryExpr`, so the
400+
/// mapping is not duplicated across crates.
401+
pub fn from_proto_name(name: &str) -> Option<Operator> {
402+
Some(match name {
403+
"And" => Operator::And,
404+
"Or" => Operator::Or,
405+
"Eq" => Operator::Eq,
406+
"NotEq" => Operator::NotEq,
407+
"LtEq" => Operator::LtEq,
408+
"Lt" => Operator::Lt,
409+
"Gt" => Operator::Gt,
410+
"GtEq" => Operator::GtEq,
411+
"Plus" => Operator::Plus,
412+
"Minus" => Operator::Minus,
413+
"Multiply" => Operator::Multiply,
414+
"Divide" => Operator::Divide,
415+
"Modulo" => Operator::Modulo,
416+
"IsDistinctFrom" => Operator::IsDistinctFrom,
417+
"IsNotDistinctFrom" => Operator::IsNotDistinctFrom,
418+
"BitwiseAnd" => Operator::BitwiseAnd,
419+
"BitwiseOr" => Operator::BitwiseOr,
420+
"BitwiseXor" => Operator::BitwiseXor,
421+
"BitwiseShiftLeft" => Operator::BitwiseShiftLeft,
422+
"BitwiseShiftRight" => Operator::BitwiseShiftRight,
423+
"RegexIMatch" => Operator::RegexIMatch,
424+
"RegexMatch" => Operator::RegexMatch,
425+
"RegexNotIMatch" => Operator::RegexNotIMatch,
426+
"RegexNotMatch" => Operator::RegexNotMatch,
427+
"LikeMatch" => Operator::LikeMatch,
428+
"ILikeMatch" => Operator::ILikeMatch,
429+
"NotLikeMatch" => Operator::NotLikeMatch,
430+
"NotILikeMatch" => Operator::NotILikeMatch,
431+
"StringConcat" => Operator::StringConcat,
432+
"AtArrow" => Operator::AtArrow,
433+
"ArrowAt" => Operator::ArrowAt,
434+
_ => return None,
435+
})
436+
}
393437
}
394438

395439
impl fmt::Display for Operator {

datafusion/physical-expr-common/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,18 @@ workspace = true
4040
[lib]
4141
name = "datafusion_physical_expr_common"
4242

43+
[features]
44+
default = []
45+
# Enables the `PhysicalExpr::to_proto` hook used by `datafusion-proto`.
46+
# Off by default so crates that never serialize plans pay nothing.
47+
proto = ["dep:datafusion-proto-models"]
48+
4349
[dependencies]
4450
arrow = { workspace = true }
4551
chrono = { workspace = true }
4652
datafusion-common = { workspace = true }
4753
datafusion-expr-common = { workspace = true }
54+
datafusion-proto-models = { workspace = true, optional = true }
4855
hashbrown = { workspace = true }
4956
indexmap = { workspace = true }
5057
itertools = { workspace = true }

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,180 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
477477
fn expression_id(&self) -> Option<u64> {
478478
None
479479
}
480+
481+
/// Serialize this expression to a [`PhysicalExprNode`] proto message.
482+
///
483+
/// Returning `Ok(None)` means "this expression does not know how to
484+
/// serialize itself"; the caller (typically `datafusion-proto`) will fall
485+
/// back to its existing codec / extension paths. This matches today's
486+
/// behavior for expressions that aren't built into `datafusion-proto`.
487+
///
488+
/// Returning `Ok(Some(node))` means the expression has serialized itself
489+
/// fully; the caller should not try any further fallback path.
490+
///
491+
/// Returning `Err(_)` means a real serialization failure (e.g. the
492+
/// expression knows it should serialize but a child failed).
493+
///
494+
/// The motivating use case is letting expressions with private state
495+
/// (e.g. `DynamicFilterPhysicalExpr`'s `RwLock`-protected inner fields)
496+
/// reach into their own internals for `try_to_proto`/`try_from_proto`
497+
/// without having to expose `pub` accessors to `datafusion-proto`. See
498+
/// <https://github.com/apache/datafusion/issues/21835>.
499+
///
500+
/// The `try_` prefix matches the fallible `try_from_proto` decode
501+
/// constructors (and the `TryFromProto` trait in `datafusion-proto`);
502+
/// both sides of the round-trip are fallible and named consistently.
503+
///
504+
/// [`PhysicalExprNode`]: datafusion_proto_models::protobuf::PhysicalExprNode
505+
#[cfg(feature = "proto")]
506+
fn try_to_proto(
507+
&self,
508+
_ctx: &proto_encode::PhysicalExprEncodeCtx<'_>,
509+
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
510+
Ok(None)
511+
}
512+
}
513+
514+
/// Encode-side context for [`PhysicalExpr::try_to_proto`].
515+
///
516+
/// Expression authors only ever see [`proto_encode::PhysicalExprEncodeCtx`]:
517+
/// a concrete struct with stable methods. Internally it dispatches to a
518+
/// [`proto_encode::PhysicalExprEncode`] implementor that lives in
519+
/// `datafusion-proto`, which is what lets `physical-expr-common` stay free
520+
/// of `datafusion-proto` as a dep.
521+
///
522+
/// More specialized helpers (e.g. encoding UDFs/UDAFs/UDWFs through the
523+
/// extension codec) can be added to the context as expressions migrate;
524+
/// today they're not required because the encoder forwards to the existing
525+
/// codec via the proto converter.
526+
#[cfg(feature = "proto")]
527+
pub mod proto_encode {
528+
use std::sync::Arc;
529+
530+
use datafusion_common::Result;
531+
use datafusion_proto_models::protobuf::PhysicalExprNode;
532+
533+
use super::PhysicalExpr;
534+
535+
/// Encoder context handed to [`super::PhysicalExpr::try_to_proto`].
536+
///
537+
/// Wraps an internal [`PhysicalExprEncode`] trait object so callers see a
538+
/// stable concrete type while implementations can evolve in
539+
/// `datafusion-proto`.
540+
pub struct PhysicalExprEncodeCtx<'a> {
541+
encoder: &'a dyn PhysicalExprEncode,
542+
}
543+
544+
impl<'a> PhysicalExprEncodeCtx<'a> {
545+
/// Construct a new encode context. Typically called by
546+
/// `datafusion-proto`; expression authors receive `&PhysicalExprEncodeCtx`.
547+
pub fn new(encoder: &'a dyn PhysicalExprEncode) -> Self {
548+
Self { encoder }
549+
}
550+
551+
/// Encode a child expression. Routes through the configured encoder
552+
/// so dedup-aware encoding is preserved.
553+
pub fn encode_child(
554+
&self,
555+
expr: &Arc<dyn PhysicalExpr>,
556+
) -> Result<PhysicalExprNode> {
557+
self.encoder.encode(expr)
558+
}
559+
}
560+
561+
/// Internal dispatch trait. Implementors live in `datafusion-proto` and
562+
/// wrap the existing `PhysicalExtensionCodec` +
563+
/// `PhysicalProtoConverterExtension` plumbing. Expression authors should
564+
/// use [`PhysicalExprEncodeCtx`] instead of calling this directly.
565+
pub trait PhysicalExprEncode {
566+
/// Encode an expression to a protobuf node.
567+
fn encode(&self, expr: &Arc<dyn PhysicalExpr>) -> Result<PhysicalExprNode>;
568+
}
569+
}
570+
571+
/// Decode-side counterpart to [`proto_encode`].
572+
///
573+
/// Expression authors implement an associated `try_from_proto` on their
574+
/// concrete type, with the signature
575+
///
576+
/// ```ignore
577+
/// fn try_from_proto(
578+
/// node: &PhysicalExprNode,
579+
/// ctx: &PhysicalExprDecodeCtx<'_>,
580+
/// ) -> Result<Arc<dyn PhysicalExpr>>
581+
/// ```
582+
///
583+
/// It takes the whole [`PhysicalExprNode`] — the exact inverse of what
584+
/// [`PhysicalExpr::try_to_proto`] returns — so the constructor can also see
585+
/// outer-node fields such as `expr_id`. The central match in
586+
/// `datafusion-proto` dispatches `ExprType` variants to these constructors.
587+
///
588+
/// As with the encode side, the public surface is a struct (not a `&dyn`
589+
/// trait) so future fields/helpers (registries for third-party expressions,
590+
/// schema-resolution caches, etc.) can be added without changing the
591+
/// signature every expression depends on.
592+
///
593+
/// [`PhysicalExprNode`]: datafusion_proto_models::protobuf::PhysicalExprNode
594+
#[cfg(feature = "proto")]
595+
pub mod proto_decode {
596+
use std::sync::Arc;
597+
598+
use arrow::datatypes::Schema;
599+
use datafusion_common::Result;
600+
use datafusion_proto_models::protobuf::PhysicalExprNode;
601+
602+
use super::PhysicalExpr;
603+
604+
/// Decoder context handed to per-expression `try_from_proto` constructors.
605+
///
606+
/// Wraps an internal [`PhysicalExprDecode`] trait object plus a borrowed
607+
/// schema. The trait stays an implementation detail of `datafusion-proto`;
608+
/// expression authors only see this struct.
609+
pub struct PhysicalExprDecodeCtx<'a> {
610+
schema: &'a Schema,
611+
decoder: &'a dyn PhysicalExprDecode,
612+
}
613+
614+
impl<'a> PhysicalExprDecodeCtx<'a> {
615+
/// Construct a new decode context. Typically called by
616+
/// `datafusion-proto`; expression authors receive
617+
/// `&PhysicalExprDecodeCtx`.
618+
pub fn new(schema: &'a Schema, decoder: &'a dyn PhysicalExprDecode) -> Self {
619+
Self { schema, decoder }
620+
}
621+
622+
/// The schema bound to this decode context. Use it for column lookups,
623+
/// data-type resolution, etc.
624+
pub fn schema(&self) -> &Schema {
625+
self.schema
626+
}
627+
628+
/// Decode an expression node, recursing into child sub-expressions.
629+
///
630+
/// Routes built-in `ExprType` variants through `datafusion-proto`'s
631+
/// central match and forwards extension nodes to the registered codec
632+
/// (today via [`PhysicalExtensionCodec::try_decode_expr`]; later via
633+
/// a per-type registry — see #21835).
634+
///
635+
/// [`PhysicalExtensionCodec::try_decode_expr`]: https://docs.rs/datafusion-proto/latest/datafusion_proto/physical_plan/trait.PhysicalExtensionCodec.html#method.try_decode_expr
636+
pub fn decode(&self, node: &PhysicalExprNode) -> Result<Arc<dyn PhysicalExpr>> {
637+
self.decoder.decode(node, self.schema)
638+
}
639+
}
640+
641+
/// Internal dispatch trait. Implementors live in `datafusion-proto`.
642+
/// Expression authors should use [`PhysicalExprDecodeCtx`] instead of
643+
/// calling this directly.
644+
pub trait PhysicalExprDecode {
645+
/// Decode a proto node into a concrete `PhysicalExpr`. The schema is
646+
/// passed alongside so implementations can support recursive children
647+
/// and rebind the context per call (e.g. for nested plans).
648+
fn decode(
649+
&self,
650+
node: &PhysicalExprNode,
651+
schema: &Schema,
652+
) -> Result<Arc<dyn PhysicalExpr>>;
653+
}
480654
}
481655

482656
#[deprecated(

datafusion/physical-expr/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ name = "datafusion_physical_expr"
4242

4343
[features]
4444
recursive_protection = ["dep:recursive"]
45+
# Forwards the `proto` feature to `datafusion-physical-expr-common`, exposing
46+
# `PhysicalExpr::to_proto` and letting expressions in this crate implement it.
47+
proto = [
48+
"dep:datafusion-proto-models",
49+
"datafusion-physical-expr-common/proto",
50+
]
4551

4652
[dependencies]
4753
arrow = { workspace = true }
@@ -50,6 +56,7 @@ datafusion-expr = { workspace = true }
5056
datafusion-expr-common = { workspace = true }
5157
datafusion-functions-aggregate-common = { workspace = true }
5258
datafusion-physical-expr-common = { workspace = true }
59+
datafusion-proto-models = { workspace = true, optional = true }
5360
hashbrown = { workspace = true }
5461
indexmap = { workspace = true }
5562
itertools = { workspace = true, features = ["use_std"] }

0 commit comments

Comments
 (0)