Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,18 +716,23 @@ impl protobuf::PhysicalPlanNode {
})?;

let filter_selectivity = filter.default_filter_selectivity.try_into();
let projection = if !filter.projection.is_empty() {
Some(
filter
.projection
.iter()
.map(|i| *i as usize)
.collect::<Vec<_>>(),
)
} else {
// Preserve the `None` state across proto boundaries. Proto cannot distinguish
// between `None` (full projection) and `Some(vec![])` (empty projection) since
// both serialize as an empty list. If all columns are included, we reconstruct
// `None` to avoid losing this semantic distinction on deserialization.
let num_fields = input.schema().fields().len();
Comment thread
Adez017 marked this conversation as resolved.
let mut is_full_projection = filter.projection.len() == num_fields;
let mut projection_vec: Vec<usize> = Vec::with_capacity(filter.projection.len());
for (i, idx) in filter.projection.iter().enumerate() {
let idx = *idx as usize;
is_full_projection &= idx == i;
projection_vec.push(idx);
}
let projection = if is_full_projection {
None
} else {
Some(projection_vec)
};

let filter = FilterExecBuilder::new(predicate, input)
.apply_projection(projection)?
.with_batch_size(filter.batch_size as usize)
Expand Down Expand Up @@ -2339,9 +2344,12 @@ impl protobuf::PhysicalPlanNode {
.physical_expr_to_proto(exec.predicate(), codec)?,
),
default_filter_selectivity: exec.default_selectivity() as u32,
projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
}),
projection: match exec.projection() {
None => (0..exec.input().schema().fields().len())
.map(|i| i as u32)
.collect(),
Some(v) => v.iter().map(|x| *x as u32).collect(),
},
batch_size: exec.batch_size() as u32,
fetch: exec.fetch().map(|f| f as u32),
},
Expand Down
37 changes: 37 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3681,3 +3681,40 @@ async fn roundtrip_issue_18602_complex_filter_decode_recursion() -> Result<()> {

roundtrip_test_sql_with_context(sql, &ctx).await
}

#[test]
fn roundtrip_filter_with_none_projection() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]));
let predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
lit(ScalarValue::Int32(Some(0))),
));
let input: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(Arc::clone(&schema)));

// Case 1: None projection (return all columns)
roundtrip_test(Arc::new(FilterExec::try_new(
Arc::clone(&predicate),
Arc::clone(&input),
)?))?;

Comment thread
Adez017 marked this conversation as resolved.
// Case 2: Some(vec![]) — explicitly empty projection
roundtrip_test(Arc::new(
FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
.apply_projection(Some(vec![]))?
.build()?,
))?;

// Case 3: Some(vec![2, 0]) — partial projection
roundtrip_test(Arc::new(
FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
.apply_projection(Some(vec![2, 0]))?
.build()?,
))?;

Ok(())
}
Loading