Skip to content

Commit 50d74a7

Browse files
proto: add proto converter reference to PhysicalExtensionCodec trait (apache#21055)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes apache#21056 ## Rationale for this change Custom `ExecutionPlan` nodes that store `PhysicalExpr` fields cannot participate in expression deduplication during serialization because `PhysicalExtensionCodec::try_encode`/`try_decode` lack access to the `PhysicalProtoConverterExtension`. ## What changes are included in this PR? - Added `proto_converter: &dyn PhysicalProtoConverterExtension` parameter to `PhysicalExtensionCodec::try_decode` and `try_encode` - Updated all implementations: `DefaultPhysicalExtensionCodec`, `ComposedPhysicalExtensionCodec`, `ForeignPhysicalExtensionCodec` (FFI), and all examples - Updated call sites in `PhysicalPlanNode` serialization/deserialization - FFI bridge passes `DefaultPhysicalProtoConverter` as a fallback ## Are these changes tested? Yes — added `test_custom_node_with_dynamic_filter_dedup_roundtrip` which verifies that a `DynamicFilterPhysicalExpr` shared between a `FilterExec` and a custom `ExecutionPlan` node preserves its shared inner state after a roundtrip through `DeduplicatingProtoConverter`. ## Are there any user-facing changes? Breaking API change: `PhysicalExtensionCodec::try_decode` and `try_encode` now take an additional `&dyn PhysicalProtoConverterExtension` parameter. All custom codec implementations must be updated.
1 parent ad6a507 commit 50d74a7

6 files changed

Lines changed: 301 additions & 24 deletions

File tree

datafusion-examples/examples/custom_data_source/adapter_serialization.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec {
275275
buf: &[u8],
276276
inputs: &[Arc<dyn ExecutionPlan>],
277277
_ctx: &TaskContext,
278+
_proto_converter: &dyn PhysicalProtoConverterExtension,
278279
) -> Result<Arc<dyn ExecutionPlan>> {
279280
// Try to parse as our extension payload
280281
if let Ok(payload) = serde_json::from_slice::<ExtensionPayload>(buf)
@@ -303,6 +304,7 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec {
303304
&self,
304305
_node: Arc<dyn ExecutionPlan>,
305306
_buf: &mut Vec<u8>,
307+
_proto_converter: &dyn PhysicalProtoConverterExtension,
306308
) -> Result<()> {
307309
// We don't need this for the example - adapter wrapping happens in
308310
// `execution_plan_to_proto` instead.

datafusion-examples/examples/proto/composed_extension_codec.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
4343
use datafusion::prelude::SessionContext;
4444
use datafusion_proto::physical_plan::{
4545
AsExecutionPlan, ComposedPhysicalExtensionCodec, PhysicalExtensionCodec,
46+
PhysicalProtoConverterExtension,
4647
};
4748
use datafusion_proto::protobuf;
4849

@@ -145,6 +146,7 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec {
145146
buf: &[u8],
146147
inputs: &[Arc<dyn ExecutionPlan>],
147148
_ctx: &TaskContext,
149+
_proto_converter: &dyn PhysicalProtoConverterExtension,
148150
) -> Result<Arc<dyn ExecutionPlan>> {
149151
if buf == "ParentExec".as_bytes() {
150152
Ok(Arc::new(ParentExec {
@@ -155,7 +157,12 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec {
155157
}
156158
}
157159

158-
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
160+
fn try_encode(
161+
&self,
162+
node: Arc<dyn ExecutionPlan>,
163+
buf: &mut Vec<u8>,
164+
_proto_converter: &dyn PhysicalProtoConverterExtension,
165+
) -> Result<()> {
159166
if node.is::<ParentExec>() {
160167
buf.extend_from_slice("ParentExec".as_bytes());
161168
Ok(())
@@ -226,6 +233,7 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
226233
buf: &[u8],
227234
_inputs: &[Arc<dyn ExecutionPlan>],
228235
_ctx: &TaskContext,
236+
_proto_converter: &dyn PhysicalProtoConverterExtension,
229237
) -> Result<Arc<dyn ExecutionPlan>> {
230238
if buf == "ChildExec".as_bytes() {
231239
Ok(Arc::new(ChildExec {}))
@@ -234,7 +242,12 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
234242
}
235243
}
236244

237-
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
245+
fn try_encode(
246+
&self,
247+
node: Arc<dyn ExecutionPlan>,
248+
buf: &mut Vec<u8>,
249+
_proto_converter: &dyn PhysicalProtoConverterExtension,
250+
) -> Result<()> {
238251
if node.is::<ChildExec>() {
239252
buf.extend_from_slice("ChildExec".as_bytes());
240253
Ok(())

datafusion-examples/examples/proto/expression_deduplication.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ impl PhysicalExtensionCodec for CachingCodec {
187187
_buf: &[u8],
188188
_inputs: &[Arc<dyn ExecutionPlan>],
189189
_ctx: &TaskContext,
190+
_proto_converter: &dyn PhysicalProtoConverterExtension,
190191
) -> Result<Arc<dyn ExecutionPlan>> {
191192
datafusion::common::not_impl_err!("No custom extension nodes")
192193
}
@@ -196,6 +197,7 @@ impl PhysicalExtensionCodec for CachingCodec {
196197
&self,
197198
_node: Arc<dyn ExecutionPlan>,
198199
_buf: &mut Vec<u8>,
200+
_proto_converter: &dyn PhysicalProtoConverterExtension,
199201
) -> Result<()> {
200202
datafusion::common::not_impl_err!("No custom extension nodes")
201203
}

datafusion/ffi/src/proto/physical_extension_codec.rs

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ use datafusion_expr::{
2525
AggregateUDF, AggregateUDFImpl, ScalarUDF, ScalarUDFImpl, WindowUDF, WindowUDFImpl,
2626
};
2727
use datafusion_physical_plan::ExecutionPlan;
28-
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
28+
use datafusion_proto::physical_plan::{
29+
DefaultPhysicalProtoConverter, PhysicalExtensionCodec,
30+
PhysicalProtoConverterExtension,
31+
};
2932

3033
use stabby::slice::Slice as SSlice;
3134
use stabby::str::Str as SStr;
@@ -145,8 +148,12 @@ unsafe extern "C" fn try_decode_fn_wrapper(
145148
.collect::<Result<Vec<_>>>();
146149
let inputs = sresult_return!(inputs);
147150

148-
let plan =
149-
sresult_return!(codec.try_decode(buf.as_ref(), &inputs, task_ctx.as_ref()));
151+
let plan = sresult_return!(codec.try_decode(
152+
buf.as_ref(),
153+
&inputs,
154+
task_ctx.as_ref(),
155+
&DefaultPhysicalProtoConverter {},
156+
));
150157

151158
FFI_Result::Ok(FFI_ExecutionPlan::new(plan, runtime))
152159
}
@@ -160,7 +167,11 @@ unsafe extern "C" fn try_encode_fn_wrapper(
160167
let plan: Arc<dyn ExecutionPlan> = sresult_return!((&node).try_into());
161168

162169
let mut bytes = Vec::new();
163-
sresult_return!(codec.try_encode(plan, &mut bytes));
170+
sresult_return!(codec.try_encode(
171+
plan,
172+
&mut bytes,
173+
&DefaultPhysicalProtoConverter {}
174+
));
164175

165176
FFI_Result::Ok(bytes.into_iter().collect())
166177
}
@@ -335,6 +346,7 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec {
335346
buf: &[u8],
336347
inputs: &[Arc<dyn ExecutionPlan>],
337348
_ctx: &TaskContext,
349+
_proto_converter: &dyn PhysicalProtoConverterExtension,
338350
) -> Result<Arc<dyn ExecutionPlan>> {
339351
let inputs = inputs
340352
.iter()
@@ -348,7 +360,12 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec {
348360
Ok(plan)
349361
}
350362

351-
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
363+
fn try_encode(
364+
&self,
365+
node: Arc<dyn ExecutionPlan>,
366+
buf: &mut Vec<u8>,
367+
_proto_converter: &dyn PhysicalProtoConverterExtension,
368+
) -> Result<()> {
352369
let plan = FFI_ExecutionPlan::new(node, None);
353370
let bytes = df_result!(unsafe { (self.0.try_encode)(&self.0, plan) })?;
354371

@@ -426,7 +443,10 @@ pub(crate) mod tests {
426443
use datafusion_functions_aggregate::sum::Sum;
427444
use datafusion_functions_window::rank::{Rank, RankType};
428445
use datafusion_physical_plan::ExecutionPlan;
429-
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
446+
use datafusion_proto::physical_plan::{
447+
DefaultPhysicalProtoConverter, PhysicalExtensionCodec,
448+
PhysicalProtoConverterExtension,
449+
};
430450

431451
use crate::execution_plan::tests::EmptyExec;
432452
use crate::proto::physical_extension_codec::FFI_PhysicalExtensionCodec;
@@ -449,6 +469,7 @@ pub(crate) mod tests {
449469
buf: &[u8],
450470
_inputs: &[Arc<dyn ExecutionPlan>],
451471
_ctx: &TaskContext,
472+
_proto_converter: &dyn PhysicalProtoConverterExtension,
452473
) -> Result<Arc<dyn ExecutionPlan>> {
453474
if buf[0] != Self::MAGIC_NUMBER {
454475
return exec_err!(
@@ -467,6 +488,7 @@ pub(crate) mod tests {
467488
&self,
468489
node: Arc<dyn ExecutionPlan>,
469490
buf: &mut Vec<u8>,
491+
_proto_converter: &dyn PhysicalProtoConverterExtension,
470492
) -> Result<()> {
471493
buf.push(Self::MAGIC_NUMBER);
472494

@@ -587,10 +609,18 @@ pub(crate) mod tests {
587609
let exec = create_test_exec();
588610
let input_execs = [create_test_exec()];
589611
let mut bytes = Vec::new();
590-
foreign_codec.try_encode(Arc::clone(&exec), &mut bytes)?;
591-
592-
let returned_exec =
593-
foreign_codec.try_decode(&bytes, &input_execs, ctx.task_ctx().as_ref())?;
612+
foreign_codec.try_encode(
613+
Arc::clone(&exec),
614+
&mut bytes,
615+
&DefaultPhysicalProtoConverter {},
616+
)?;
617+
618+
let returned_exec = foreign_codec.try_decode(
619+
&bytes,
620+
&input_execs,
621+
ctx.task_ctx().as_ref(),
622+
&DefaultPhysicalProtoConverter {},
623+
)?;
594624

595625
assert!(returned_exec.is::<EmptyExec>());
596626

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ impl protobuf::PhysicalPlanNode {
669669
}
670670

671671
let mut buf: Vec<u8> = vec![];
672-
match codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
672+
match codec.try_encode(Arc::clone(&plan_clone), &mut buf, proto_converter) {
673673
Ok(_) => {
674674
let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
675675
.children()
@@ -1843,9 +1843,12 @@ impl protobuf::PhysicalPlanNode {
18431843
.map(|i| proto_converter.proto_to_execution_plan(i, ctx))
18441844
.collect::<Result<_>>()?;
18451845

1846-
let extension_node =
1847-
ctx.codec()
1848-
.try_decode(extension.node.as_slice(), &inputs, ctx.task_ctx())?;
1846+
let extension_node = ctx.codec().try_decode(
1847+
extension.node.as_slice(),
1848+
&inputs,
1849+
ctx.task_ctx(),
1850+
proto_converter,
1851+
)?;
18491852

18501853
Ok(extension_node)
18511854
}
@@ -3852,9 +3855,15 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync + Any {
38523855
buf: &[u8],
38533856
inputs: &[Arc<dyn ExecutionPlan>],
38543857
ctx: &TaskContext,
3858+
proto_converter: &dyn PhysicalProtoConverterExtension,
38553859
) -> Result<Arc<dyn ExecutionPlan>>;
38563860

3857-
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
3861+
fn try_encode(
3862+
&self,
3863+
node: Arc<dyn ExecutionPlan>,
3864+
buf: &mut Vec<u8>,
3865+
proto_converter: &dyn PhysicalProtoConverterExtension,
3866+
) -> Result<()>;
38583867

38593868
fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
38603869
not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
@@ -3908,6 +3917,7 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
39083917
_buf: &[u8],
39093918
_inputs: &[Arc<dyn ExecutionPlan>],
39103919
_ctx: &TaskContext,
3920+
_proto_converter: &dyn PhysicalProtoConverterExtension,
39113921
) -> Result<Arc<dyn ExecutionPlan>> {
39123922
not_impl_err!("PhysicalExtensionCodec is not provided")
39133923
}
@@ -3916,6 +3926,7 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
39163926
&self,
39173927
_node: Arc<dyn ExecutionPlan>,
39183928
_buf: &mut Vec<u8>,
3929+
_proto_converter: &dyn PhysicalProtoConverterExtension,
39193930
) -> Result<()> {
39203931
not_impl_err!("PhysicalExtensionCodec is not provided")
39213932
}
@@ -4239,12 +4250,22 @@ impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
42394250
buf: &[u8],
42404251
inputs: &[Arc<dyn ExecutionPlan>],
42414252
ctx: &TaskContext,
4253+
proto_converter: &dyn PhysicalProtoConverterExtension,
42424254
) -> Result<Arc<dyn ExecutionPlan>> {
4243-
self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, ctx))
4255+
self.decode_protobuf(buf, |codec, data| {
4256+
codec.try_decode(data, inputs, ctx, proto_converter)
4257+
})
42444258
}
42454259

4246-
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
4247-
self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data))
4260+
fn try_encode(
4261+
&self,
4262+
node: Arc<dyn ExecutionPlan>,
4263+
buf: &mut Vec<u8>,
4264+
proto_converter: &dyn PhysicalProtoConverterExtension,
4265+
) -> Result<()> {
4266+
self.encode_protobuf(buf, |codec, data| {
4267+
codec.try_encode(Arc::clone(&node), data, proto_converter)
4268+
})
42484269
}
42494270

42504271
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {

0 commit comments

Comments
 (0)