Commit baef8f0
Add support for logical and physical codecs (#1541)
* feat: unify logical + physical proto codec stack via SessionContext
Introduces a single composable codec layer that every serializer reads
from the session, replacing the hardcoded `DefaultLogicalExtensionCodec`
/ `DefaultPhysicalExtensionCodec` calls scattered across PyLogicalPlan,
PyExecutionPlan, and the Rust-wrapped Python provider plumbing.
Key changes:
* New `PythonLogicalCodec` and `PythonPhysicalCodec` (crates/core/src/codec.rs)
wrap any inner `LogicalExtensionCodec` / `PhysicalExtensionCodec`. Both
share a `DFPYUDF1` magic-prefix path for in-band cloudpickle encoding
of Python scalar UDFs, so an `ExecutionPlan` / `PhysicalExpr`
referencing a Python `ScalarUDF` round-trips through either layer.
Magic-prefix registry table (DFPYUDF1 in use; DFPYUDA1 / DFPYUDW1 /
DFPYPE1 reserved) documented in the module header.
* `PySessionContext` stores `Arc<PythonLogicalCodec>` and
`Arc<PythonPhysicalCodec>` directly. FFI wrappers are built on demand
via `ffi_logical_codec()` / `ffi_physical_codec()` for capsule export
and downstream `RustWrappedPy*` consumers. Adds
`__datafusion_physical_extension_codec__` getter +
`with_physical_extension_codec` setter (symmetric with the logical
pair).
* `PyLogicalPlan.to_proto` / `from_proto` renamed to `to_bytes` /
`from_bytes`, now reading the session's logical codec. `to_proto` /
`from_proto` survive as deprecated thin wrappers emitting
`DeprecationWarning`.
* `PyExecutionPlan` gains the same `to_bytes` / `from_bytes` rename +
deprecated aliases, plus `__datafusion_execution_plan__` capsule
getter and `from_pycapsule` (ported from poc_ffi_query_planner).
* New `PyPhysicalExpr` class with `to_bytes` / `from_bytes` /
`from_pycapsule` / `__datafusion_physical_expr__`. `from_bytes`
takes an input pyarrow Schema for column-reference resolution.
* `datafusion-python-util` gains `from_pycapsule!` /
`try_from_pycapsule!` macros + `physical_codec_from_pycapsule`,
`task_context_from_pycapsule`, `create_physical_extension_capsule`
(ported from poc_ffi_query_planner).
* `PythonFunctionScalarUDF` exposes `func()`, `input_fields()`,
`return_field()`, `volatility()`, `from_parts()` accessors needed
by the codec.
Python wrapper updates: `LogicalPlan` / `ExecutionPlan` add
`to_bytes` / `from_bytes` + deprecate `to_proto` / `from_proto`;
`ExecutionPlan` adds capsule getter + `from_pycapsule`; new
`PhysicalExpr` wrapper class exported from the top-level package;
`SessionContext` exposes the physical codec capsule + setter.
Test coverage in python/tests/test_plans.py: round-trip via new API,
deprecation warnings on old API, capsule protocol getters,
session-routed codec on both layers.
`PyLogicalPlan` PyCapsule protocol is intentionally not added —
`datafusion-ffi` does not expose `FFI_LogicalPlan`, so there is no
stable cross-crate shape to publish. Round-tripping a `LogicalPlan`
goes through `to_bytes` / `from_bytes` only.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* test: FFI-example integration tests for codec + plan capsule APIs
Adds four downstream-crate fixtures in `datafusion-ffi-example` so the
new PR1 surface can be tested with the same FFI-handoff pattern used
for table providers, UDFs, etc. Existing tests prove the API exists;
these tests prove it composes with code that lives in another crate.
New Rust types in `examples/datafusion-ffi-example/src/`:
* `MyLogicalExtensionCodec` — delegates to
`DefaultLogicalExtensionCodec` and bumps atomic counters on the UDF
encode/decode entry points. Exported via
`__datafusion_logical_extension_codec__`. Installed onto a session
with `ctx.with_logical_extension_codec(my_codec)`.
* `MyPhysicalExtensionCodec` — mirror for `PhysicalExtensionCodec`.
* `MyExecutionPlan` — wraps a one-column `EmptyExec`, exposes
`__datafusion_execution_plan__`. Lets the receiver consume an
`ExecutionPlan` capsule that did not originate in
datafusion-python.
* `MyPhysicalExpr` — wraps `Literal(Int32(42))`, exposes
`__datafusion_physical_expr__`. Same FFI handoff for physical
expressions.
New tests:
* `_test_logical_extension_codec.py` — codec installs cleanly, the
session re-exports its capsule, and `try_encode_udf` fires on the
user codec when serializing a plan that references a `ScalarUDF`.
The decode counterpart is a round-trip check rather than a counter
assertion: when the UDF is in the receiver's function registry,
`parse_expr` resolves by name before consulting the codec.
* `_test_physical_extension_codec.py` — symmetric.
* `_test_execution_plan.py` — parametrized over typed-class vs
raw-capsule input; verifies `ExecutionPlan.from_pycapsule` consumes
the downstream capsule.
* `_test_physical_expr.py` — same for `PhysicalExpr.from_pycapsule`.
API changes forced by the new tests:
* `PyLogicalPlan.to_bytes`, `PyExecutionPlan.to_bytes`,
`PyPhysicalExpr.to_bytes` now accept an optional `ctx` parameter.
When supplied, encoding routes through the session's installed
codec instead of a fresh default. `ctx=None` preserves the previous
default-codec behavior used by the deprecated `to_proto` shims.
* The util `from_pycapsule!` / `try_from_pycapsule!` macros now
validate the capsule name via `pointer_checked(Some(c"..."))`
rather than `pointer_checked(None)`. The latter rejects named
capsules outright with CPython's "incorrect name" error.
* `SessionContext.with_logical_extension_codec` and
`with_physical_extension_codec` now wrap the returned internal
context in `SessionContext` so the result has the full Python
surface. The pre-existing logical setter was returning a raw
internal object that lacked `sql()` and friends.
`examples/datafusion-ffi-example/Cargo.toml` gains `datafusion` and
`datafusion-proto` workspace dependencies for the new Rust impls.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* refactor: tighten PR1 scope to codec plumbing only
Review feedback pass. PR1 is now strictly the composable codec layer +
session routing + class-method serialization API. Anything that
touches actual Python UDF inline encoding or Python expression
wrapping moves to PR2 alongside the pickle work.
Dropped:
* `encode_python_scalar_udf` / `decode_python_scalar_udf` helpers
from `crates/core/src/codec.rs`, along with cloudpickle and pyarrow
imports. The wrapper codecs now delegate every method to `inner`.
`DFPYUDF1` magic constant is kept (marked `dead_code` for now) as a
reservation so PR2 has a single definition site.
* `udf.rs` reverted to pre-PR1 shape. The codec no longer needs
`func()` / `input_fields()` / `volatility()` / `from_parts()`
accessors. Re-added by PR2 when scalar-UDF inlining lands.
* `PyPhysicalExpr` class + Python wrapper + `__init__` export +
`MyPhysicalExpr` FFI fixture + `_test_physical_expr.py`. No
consumer in PR1 or PR2 plan documents; symmetry with
`PyExecutionPlan` is not enough to justify the surface area.
* Rust-side `PyLogicalPlan::to_proto` / `from_proto` and
`PyExecutionPlan::to_proto` / `from_proto` deprecated wrappers.
The deprecation lives entirely in the Python wrapper layer, which
emits `DeprecationWarning` and forwards to `to_bytes` /
`from_bytes`. Less Rust duplication.
* `PythonLogicalCodec::with_default_inner` /
`PythonPhysicalCodec::with_default_inner` — redundant with
`impl Default`. Logic moved into `Default::default`.
* `PySessionContext::default_logical_codec` /
`default_physical_codec` helpers. Inlined as
`Arc::new(PythonLogicalCodec::default())` at the three call sites.
Tests (root: 1076, FFI example: 36) all green after the cuts.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* remove unuseful code comments
* docs: rewrite codec module comments around purpose, not PR sequence
The previous doc-block framed PythonLogicalCodec / PythonPhysicalCodec
in terms of "PR1 delegates, PR2 will add encoding" — useful for
review, useless for someone reading the code later.
Reframed in terms of what the codecs exist to do: encode Python-side
plan references (pure-Python UDFs, etc.) into the proto wire format
so plans can cross process boundaries without the receiver having to
pre-register every callable. The wrappers sit at the top of the
session's codec stack and delegate non-Python encoding to a
composable inner codec.
Magic-prefix registry table loses the "reserved" column. Doc still
notes that the in-module impls currently delegate and that
encoder/decoder hooks land alongside the corresponding Python-side
serialization work.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* feat(codec): forward every LogicalExtensionCodec /
PhysicalExtensionCodec method to inner
PythonLogicalCodec previously only overrode the four required methods
on the trait plus the scalar UDF pair, so the default trait impls
(returning "LogicalExtensionCodec is not provided") shadowed any
downstream FFI codec for file formats, aggregate UDFs, and window
UDFs. A user installing their own codec via
`SessionContext.with_logical_extension_codec(...)` would silently
lose access to its `try_*_file_format`, `try_*_udaf`,
`try_*_udwf` implementations.
Forward every trait method to `inner` so the user-installed codec is
fully reachable. Same change on the physical side, including
`try_*_expr`, `try_*_udaf`, `try_*_udwf` — the corresponding
Python-aware paths can layer on later by intercepting before
delegation.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* docs: tighten codec dispatch test docstrings
The previous docstrings claimed the tests verify "PythonLogicalCodec
delegates non-Python UDFs to the inner codec." That's
forward-looking — the codecs currently delegate every UDF
unconditionally, so the test would behave identically for Python and
non-Python UDFs.
Rewrite to describe what the test actually proves: the dispatch chain
`PyLogicalPlan.to_bytes -> session.logical_codec -> PythonLogicalCodec
-> FFI -> user impl` (and the physical mirror) forwards correctly,
observable via the user codec's atomic counter incrementing after one
encode pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* refactor(ffi-example): MyExecutionPlan emits real data via
MemorySourceConfig
Was a one-column `EmptyExec` stub useful only as a capsule-handoff
target. Promoted to a minimal reference impl that a downstream Rust
crate can copy when exposing a custom `ExecutionPlan` to
datafusion-python: configurable `num_rows`, produces a single batch
of sequential `Int32` values under column `value`, wrapped in
`DataSourceExec` via `MemorySourceConfig::try_new_exec`. Header
comment explains the typical use case (remote backend, streaming
source, synthetic data generator) and the
`__datafusion_execution_plan__` capsule shape downstream crates
should follow.
Test asserts the schema-bearing plan survives the FFI hop: a
`DataSourceExec` arrives with the expected partitioning and no
children. Schema details are not surfaced through the FFI display
path (only the wrapping `ForeignExecutionPlan` name + inner plan
name appear), so the test does not assert the column name.
`to_bytes` round-trip of an FFI-imported plan is not exercised:
encoding requires a physical codec that knows how to serialize
`ForeignExecutionPlan`, which the default codec does not. A
downstream user round-tripping such a plan must install their own
codec via `with_physical_extension_codec`. Documented in the test
file rather than asserted on.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* refactor: drop dormant ExecutionPlan PyCapsule round-trip
`PyExecutionPlan::from_pycapsule` and the matching
`__datafusion_execution_plan__` exporter have no consumer in this
repo, on the POC `poc_ffi_query_planner` branch, or on any sibling
branch (`testing/datafusion-distributed`, `testing/ffi-library-marker`,
`tmp/ffi-with-codecs`). The pair was wired up speculatively for FFI
plan handoff that no Python code path actually performs today.
Drop the whole capsule round-trip for `ExecutionPlan`:
* Rust `PyExecutionPlan::from_pycapsule` and
`__datafusion_execution_plan__`.
* Python `ExecutionPlan.from_pycapsule` and
`__datafusion_execution_plan__` wrappers.
* `MyExecutionPlan` FFI fixture + `_test_execution_plan.py` + lib.rs
registration. Was solely a test fixture for the dropped path.
* `test_execution_plan_pycapsule_protocol` in `python/tests/test_plans.py`.
`PyExecutionPlan.to_bytes` / `from_bytes` survive — they encode
through the session's physical codec and have real coverage.
Capsule round-trip can be re-added when a concrete consumer
(distributed worker, bridge library) lands.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* feat: PyExpr.to_bytes / from_bytes via session logical codec
Mirrors PyLogicalPlan / PyExecutionPlan: encode through the session's
installed `LogicalExtensionCodec` (or a default-inner
`PythonLogicalCodec` when no `ctx` is supplied), decode against the
session's function registry + codec via `parse_expr`.
Rust side calls `datafusion_proto::logical_plan::to_proto::serialize_expr`
and `from_proto::parse_expr`. Python wrapper threads an optional
`SessionContext` through.
Tests cover the session-routed roundtrip and the no-ctx default-codec
encode path. Adds a third consumer of `session.logical_codec()`
alongside `PyLogicalPlan` and the codec dispatch tests in the FFI
example, broadening coverage of the codec stack.
This is the last piece of the PR1 codec surface — follow-up pickle
work (`Expr.__reduce__`, worker-scoped context, multiprocessing) can
build on this without bundling the byte-level serialization API.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* test(ffi-example): assert codec roundtrip restores plan output
PR review feedback: weak `is not None` checks let regressions slip
past. Mirror python/tests/test_plans.py — logical compares
`df.collect() == round_trip.collect()`; physical compares
`str(original) == str(restored)`.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
---------
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>1 parent 55870ff commit baef8f0
20 files changed
Lines changed: 1235 additions & 70 deletions
File tree
- crates
- core/src
- sql
- util
- src
- examples/datafusion-ffi-example
- python/tests
- src
- python
- datafusion
- tests
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
| 1 | + | |
| 2 | + | |
| 3 | + | |
| 4 | + | |
| 5 | + | |
| 6 | + | |
| 7 | + | |
| 8 | + | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
| 38 | + | |
| 39 | + | |
| 40 | + | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
| 44 | + | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
| 48 | + | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
| 52 | + | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
| 56 | + | |
| 57 | + | |
| 58 | + | |
| 59 | + | |
| 60 | + | |
| 61 | + | |
| 62 | + | |
| 63 | + | |
| 64 | + | |
| 65 | + | |
| 66 | + | |
| 67 | + | |
| 68 | + | |
| 69 | + | |
| 70 | + | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
| 74 | + | |
| 75 | + | |
| 76 | + | |
| 77 | + | |
| 78 | + | |
| 79 | + | |
| 80 | + | |
| 81 | + | |
| 82 | + | |
| 83 | + | |
| 84 | + | |
| 85 | + | |
| 86 | + | |
| 87 | + | |
| 88 | + | |
| 89 | + | |
| 90 | + | |
| 91 | + | |
| 92 | + | |
| 93 | + | |
| 94 | + | |
| 95 | + | |
| 96 | + | |
| 97 | + | |
| 98 | + | |
| 99 | + | |
| 100 | + | |
| 101 | + | |
| 102 | + | |
| 103 | + | |
| 104 | + | |
| 105 | + | |
| 106 | + | |
| 107 | + | |
| 108 | + | |
| 109 | + | |
| 110 | + | |
| 111 | + | |
| 112 | + | |
| 113 | + | |
| 114 | + | |
| 115 | + | |
| 116 | + | |
| 117 | + | |
| 118 | + | |
| 119 | + | |
| 120 | + | |
| 121 | + | |
| 122 | + | |
| 123 | + | |
| 124 | + | |
| 125 | + | |
| 126 | + | |
| 127 | + | |
| 128 | + | |
| 129 | + | |
| 130 | + | |
| 131 | + | |
| 132 | + | |
| 133 | + | |
| 134 | + | |
| 135 | + | |
| 136 | + | |
| 137 | + | |
| 138 | + | |
| 139 | + | |
| 140 | + | |
| 141 | + | |
| 142 | + | |
| 143 | + | |
| 144 | + | |
| 145 | + | |
| 146 | + | |
| 147 | + | |
| 148 | + | |
| 149 | + | |
| 150 | + | |
| 151 | + | |
| 152 | + | |
| 153 | + | |
| 154 | + | |
| 155 | + | |
| 156 | + | |
| 157 | + | |
| 158 | + | |
| 159 | + | |
| 160 | + | |
| 161 | + | |
| 162 | + | |
| 163 | + | |
| 164 | + | |
| 165 | + | |
| 166 | + | |
| 167 | + | |
| 168 | + | |
| 169 | + | |
| 170 | + | |
| 171 | + | |
| 172 | + | |
| 173 | + | |
| 174 | + | |
| 175 | + | |
| 176 | + | |
| 177 | + | |
| 178 | + | |
| 179 | + | |
| 180 | + | |
| 181 | + | |
| 182 | + | |
| 183 | + | |
| 184 | + | |
| 185 | + | |
| 186 | + | |
| 187 | + | |
| 188 | + | |
| 189 | + | |
| 190 | + | |
| 191 | + | |
| 192 | + | |
| 193 | + | |
| 194 | + | |
| 195 | + | |
| 196 | + | |
| 197 | + | |
| 198 | + | |
| 199 | + | |
| 200 | + | |
| 201 | + | |
| 202 | + | |
| 203 | + | |
| 204 | + | |
| 205 | + | |
| 206 | + | |
| 207 | + | |
| 208 | + | |
| 209 | + | |
| 210 | + | |
| 211 | + | |
| 212 | + | |
| 213 | + | |
| 214 | + | |
| 215 | + | |
| 216 | + | |
| 217 | + | |
| 218 | + | |
| 219 | + | |
| 220 | + | |
| 221 | + | |
| 222 | + | |
| 223 | + | |
| 224 | + | |
| 225 | + | |
| 226 | + | |
| 227 | + | |
| 228 | + | |
| 229 | + | |
| 230 | + | |
| 231 | + | |
| 232 | + | |
| 233 | + | |
| 234 | + | |
| 235 | + | |
| 236 | + | |
| 237 | + | |
| 238 | + | |
| 239 | + | |
| 240 | + | |
| 241 | + | |
| 242 | + | |
| 243 | + | |
| 244 | + | |
| 245 | + | |
| 246 | + | |
| 247 | + | |
| 248 | + | |
| 249 | + | |
| 250 | + | |
| 251 | + | |
| 252 | + | |
| 253 | + | |
| 254 | + | |
| 255 | + | |
| 256 | + | |
| 257 | + | |
| 258 | + | |
| 259 | + | |
| 260 | + | |
| 261 | + | |
| 262 | + | |
| 263 | + | |
| 264 | + | |
| 265 | + | |
| 266 | + | |
| 267 | + | |
| 268 | + | |
| 269 | + | |
| 270 | + | |
| 271 | + | |
| 272 | + | |
| 273 | + | |
| 274 | + | |
| 275 | + | |
| 276 | + | |
| 277 | + | |
| 278 | + | |
| 279 | + | |
| 280 | + | |
| 281 | + | |
| 282 | + | |
| 283 | + | |
| 284 | + | |
| 285 | + | |
| 286 | + | |
0 commit comments