Skip to content

Commit c884d40

Browse files
Merge pull request #149 from DataDog/codex/branch-54-proto-downcast-delegate
fix(proto): honor ExecutionPlan downcast_delegate during serialization
2 parents 888b137 + 19de38e commit c884d40

2 files changed

Lines changed: 71 additions & 2 deletions

File tree

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ impl protobuf::PhysicalPlanNode {
424424
Self: Sized,
425425
{
426426
let plan_clone = Arc::clone(&plan);
427-
let plan = plan.as_ref() as &dyn Any;
427+
let plan = plan.as_ref();
428428

429429
if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
430430
return protobuf::PhysicalPlanNode::try_from_explain_exec(exec, codec);

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ use datafusion::physical_plan::windows::{
8888
create_udwf_window_expr,
8989
};
9090
use datafusion::physical_plan::{
91-
ExecutionPlan, InputOrderMode, Partitioning, PhysicalExpr, Statistics, displayable,
91+
DisplayAs, DisplayFormatType, ExecutionPlan, InputOrderMode, Partitioning,
92+
PhysicalExpr, PlanProperties, SendableRecordBatchStream, Statistics, displayable,
9293
};
9394
use datafusion::prelude::{ParquetReadOptions, SessionContext};
9495
use datafusion::scalar::ScalarValue;
@@ -228,6 +229,74 @@ fn roundtrip_empty() -> Result<()> {
228229
roundtrip_test(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))))
229230
}
230231

232+
#[derive(Debug)]
233+
struct DowncastDelegatingExec {
234+
inner: Arc<dyn ExecutionPlan>,
235+
}
236+
237+
impl DowncastDelegatingExec {
238+
fn new(inner: Arc<dyn ExecutionPlan>) -> Self {
239+
Self { inner }
240+
}
241+
}
242+
243+
impl DisplayAs for DowncastDelegatingExec {
244+
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
245+
self.inner.fmt_as(t, f)
246+
}
247+
}
248+
249+
impl ExecutionPlan for DowncastDelegatingExec {
250+
fn name(&self) -> &str {
251+
self.inner.name()
252+
}
253+
254+
fn properties(&self) -> &Arc<PlanProperties> {
255+
self.inner.properties()
256+
}
257+
258+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
259+
self.inner.children()
260+
}
261+
262+
fn with_new_children(
263+
self: Arc<Self>,
264+
children: Vec<Arc<dyn ExecutionPlan>>,
265+
) -> Result<Arc<dyn ExecutionPlan>> {
266+
let inner = Arc::clone(&self.inner).with_new_children(children)?;
267+
Ok(Arc::new(Self::new(inner)))
268+
}
269+
270+
fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> {
271+
Some(self.inner.as_ref())
272+
}
273+
274+
fn execute(
275+
&self,
276+
partition: usize,
277+
context: Arc<TaskContext>,
278+
) -> Result<SendableRecordBatchStream> {
279+
self.inner.execute(partition, context)
280+
}
281+
}
282+
283+
#[test]
284+
fn serialize_uses_downcast_delegate() -> Result<()> {
285+
let inner: Arc<dyn ExecutionPlan> =
286+
Arc::new(EmptyExec::new(Arc::new(Schema::empty())));
287+
let plan: Arc<dyn ExecutionPlan> = Arc::new(DowncastDelegatingExec::new(inner));
288+
let codec = DefaultPhysicalExtensionCodec {};
289+
290+
let proto = PhysicalPlanNode::try_from_physical_plan(plan, &codec)?;
291+
292+
assert!(matches!(
293+
proto.physical_plan_type,
294+
Some(protobuf::physical_plan_node::PhysicalPlanType::Empty(_))
295+
));
296+
297+
Ok(())
298+
}
299+
231300
#[test]
232301
fn roundtrip_date_time_interval() -> Result<()> {
233302
let schema = Schema::new(vec![

0 commit comments

Comments
 (0)