Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,18 +716,21 @@ impl protobuf::PhysicalPlanNode {
})?;

let filter_selectivity = filter.default_filter_selectivity.try_into();
let projection = if !filter.projection.is_empty() {
Some(
filter
.projection
.iter()
.map(|i| *i as usize)
.collect::<Vec<_>>(),
)
} else {
// Determine if the projection is full to optimize used memory,
// storing `None` in this case.
Comment thread
Adez017 marked this conversation as resolved.
Outdated
let num_fields = input.schema().fields().len();
Comment thread
Adez017 marked this conversation as resolved.
let mut is_full_projection = filter.projection.len() == num_fields;
let mut projection_vec: Vec<usize> = Vec::with_capacity(filter.projection.len());
for (i, idx) in filter.projection.iter().enumerate() {
let idx = *idx as usize;
is_full_projection &= idx == i;
projection_vec.push(idx);
}
let projection = if is_full_projection {
None
} else {
Some(projection_vec)
};

let filter = FilterExecBuilder::new(predicate, input)
.apply_projection(projection)?
.with_batch_size(filter.batch_size as usize)
Expand Down Expand Up @@ -2339,9 +2342,12 @@ impl protobuf::PhysicalPlanNode {
.physical_expr_to_proto(exec.predicate(), codec)?,
),
default_filter_selectivity: exec.default_selectivity() as u32,
projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
}),
projection: match exec.projection() {
None => (0..exec.input().schema().fields().len())
.map(|i| i as u32)
.collect(),
Some(v) => v.iter().map(|x| *x as u32).collect(),
},
batch_size: exec.batch_size() as u32,
fetch: exec.fetch().map(|f| f as u32),
},
Expand Down
60 changes: 60 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3681,3 +3681,63 @@ async fn roundtrip_issue_18602_complex_filter_decode_recursion() -> Result<()> {

roundtrip_test_sql_with_context(sql, &ctx).await
}

#[tokio::test]
async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> {
let ctx = SessionContext::new();
let codec = DefaultPhysicalExtensionCodec {};

let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]));

let input: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(Arc::clone(&schema)));

let predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
));

// Case 1: None -> should round-trip as None (return all columns)
let filter =
FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)).build()?;
let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?;
let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?;
let rt = roundtripped.as_ref().downcast_ref::<FilterExec>().unwrap();
assert_eq!(
rt.projection().as_deref(),
None,
"None projection must stay None after roundtrip"
);

Comment thread
Adez017 marked this conversation as resolved.
// Case 2: Some(vec![]) -> must survive as Some([]), NOT silently become None
let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
.apply_projection(Some(vec![]))?
.build()?;
let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?;
let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?;
let rt = roundtripped.as_ref().downcast_ref::<FilterExec>().unwrap();
assert_eq!(
rt.projection().as_deref(),
Some(&[][..]),
"Empty projection Some([]) must survive roundtrip, not become None"
);

// Case 3: Some(vec![2, 0]) -> partial projection must survive
let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
.apply_projection(Some(vec![2, 0]))?
.build()?;
let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?;
let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?;
let rt = roundtripped.as_ref().downcast_ref::<FilterExec>().unwrap();
assert_eq!(
rt.projection().as_deref(),
Some(&[2_usize, 0_usize][..]),
"Partial projection must survive roundtrip"
);

Ok(())
}
Loading