Skip to content

Commit 072dc05

Browse files
author
Bert Vermeiren
committed
Fix: optimize projections for unnest logical plan
1 parent f02a0d9 commit 072dc05

5 files changed

Lines changed: 243 additions & 231 deletions

File tree

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 8 additions & 192 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,15 @@ use crate::{
4848
};
4949

5050
use super::dml::InsertOp;
51-
use super::plan::{ColumnUnnestList, ExplainFormat};
51+
use super::plan::ExplainFormat;
5252
use arrow::compute::can_cast_types;
5353
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
5454
use datafusion_common::display::ToStringifiedPlan;
5555
use datafusion_common::file_options::file_type::FileType;
5656
use datafusion_common::{
57-
exec_err, get_target_functional_dependencies, internal_err, not_impl_err,
58-
plan_datafusion_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef,
59-
DataFusionError, NullEquality, Result, ScalarValue, TableReference, ToDFSchema,
60-
UnnestOptions,
57+
exec_err, get_target_functional_dependencies, not_impl_err, plan_datafusion_err,
58+
plan_err, Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, NullEquality,
59+
Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
6160
};
6261
use datafusion_expr_common::type_coercion::binary::type_union_resolution;
6362

@@ -2045,27 +2044,6 @@ pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
20452044
unnest_with_options(input, columns, UnnestOptions::default())
20462045
}
20472046

2048-
// Get the data type of a multi-dimensional type after unnesting it
2049-
// with a given depth
2050-
fn get_unnested_list_datatype_recursive(
2051-
data_type: &DataType,
2052-
depth: usize,
2053-
) -> Result<DataType> {
2054-
match data_type {
2055-
DataType::List(field)
2056-
| DataType::FixedSizeList(field, _)
2057-
| DataType::LargeList(field) => {
2058-
if depth == 1 {
2059-
return Ok(field.data_type().clone());
2060-
}
2061-
return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
2062-
}
2063-
_ => {}
2064-
};
2065-
2066-
internal_err!("trying to unnest on invalid data type {:?}", data_type)
2067-
}
2068-
20692047
pub fn get_struct_unnested_columns(
20702048
col_name: &String,
20712049
inner_fields: &Fields,
@@ -2076,53 +2054,6 @@ pub fn get_struct_unnested_columns(
20762054
.collect()
20772055
}
20782056

2079-
// Based on data type, either struct or a variant of list
2080-
// return a set of columns as the result of unnesting
2081-
// the input columns.
2082-
// For example, given a column with name "a",
2083-
// - List(Element) returns ["a"] with data type Element
2084-
// - Struct(field1, field2) returns ["a.field1","a.field2"]
2085-
// For list data type, an argument depth is used to specify
2086-
// the recursion level
2087-
pub fn get_unnested_columns(
2088-
col_name: &String,
2089-
data_type: &DataType,
2090-
depth: usize,
2091-
) -> Result<Vec<(Column, Arc<Field>)>> {
2092-
let mut qualified_columns = Vec::with_capacity(1);
2093-
2094-
match data_type {
2095-
DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
2096-
let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
2097-
let new_field = Arc::new(Field::new(
2098-
col_name, data_type,
2099-
// Unnesting may produce NULLs even if the list is not null.
2100-
// For example: unnest([1], []) -> 1, null
2101-
true,
2102-
));
2103-
let column = Column::from_name(col_name);
2104-
// let column = Column::from((None, &new_field));
2105-
qualified_columns.push((column, new_field));
2106-
}
2107-
DataType::Struct(fields) => {
2108-
qualified_columns.extend(fields.iter().map(|f| {
2109-
let new_name = format!("{}.{}", col_name, f.name());
2110-
let column = Column::from_name(&new_name);
2111-
let new_field = f.as_ref().clone().with_name(new_name);
2112-
// let column = Column::from((None, &f));
2113-
(column, Arc::new(new_field))
2114-
}))
2115-
}
2116-
_ => {
2117-
return internal_err!(
2118-
"trying to unnest on invalid data type {:?}",
2119-
data_type
2120-
);
2121-
}
2122-
};
2123-
Ok(qualified_columns)
2124-
}
2125-
21262057
/// Create a [`LogicalPlan::Unnest`] plan with options
21272058
/// This function receive a list of columns to be unnested
21282059
/// because multiple unnest can be performed on the same column (e.g unnest with different depth)
@@ -2157,126 +2088,11 @@ pub fn unnest_with_options(
21572088
columns_to_unnest: Vec<Column>,
21582089
options: UnnestOptions,
21592090
) -> Result<LogicalPlan> {
2160-
let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
2161-
let mut struct_columns = vec![];
2162-
let indices_to_unnest = columns_to_unnest
2163-
.iter()
2164-
.map(|c| Ok((input.schema().index_of_column(c)?, c)))
2165-
.collect::<Result<HashMap<usize, &Column>>>()?;
2166-
2167-
let input_schema = input.schema();
2168-
2169-
let mut dependency_indices = vec![];
2170-
// Transform input schema into new schema
2171-
// Given this comprehensive example
2172-
//
2173-
// input schema:
2174-
// 1.col1_unnest_placeholder: list[list[int]],
2175-
// 2.col1: list[list[int]]
2176-
// 3.col2: list[int]
2177-
// with unnest on unnest(col1,depth=2), unnest(col1,depth=1) and unnest(col2,depth=1)
2178-
// output schema:
2179-
// 1.unnest_col1_depth_2: int
2180-
// 2.unnest_col1_depth_1: list[int]
2181-
// 3.col1: list[list[int]]
2182-
// 4.unnest_col2_depth_1: int
2183-
// Meaning the placeholder column will be replaced by its unnested variation(s), note
2184-
// the plural.
2185-
let fields = input_schema
2186-
.iter()
2187-
.enumerate()
2188-
.map(|(index, (original_qualifier, original_field))| {
2189-
match indices_to_unnest.get(&index) {
2190-
Some(column_to_unnest) => {
2191-
let recursions_on_column = options
2192-
.recursions
2193-
.iter()
2194-
.filter(|p| -> bool { &p.input_column == *column_to_unnest })
2195-
.collect::<Vec<_>>();
2196-
let mut transformed_columns = recursions_on_column
2197-
.iter()
2198-
.map(|r| {
2199-
list_columns.push((
2200-
index,
2201-
ColumnUnnestList {
2202-
output_column: r.output_column.clone(),
2203-
depth: r.depth,
2204-
},
2205-
));
2206-
Ok(get_unnested_columns(
2207-
&r.output_column.name,
2208-
original_field.data_type(),
2209-
r.depth,
2210-
)?
2211-
.into_iter()
2212-
.next()
2213-
.unwrap()) // because unnesting a list column always result into one result
2214-
})
2215-
.collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
2216-
if transformed_columns.is_empty() {
2217-
transformed_columns = get_unnested_columns(
2218-
&column_to_unnest.name,
2219-
original_field.data_type(),
2220-
1,
2221-
)?;
2222-
match original_field.data_type() {
2223-
DataType::Struct(_) => {
2224-
struct_columns.push(index);
2225-
}
2226-
DataType::List(_)
2227-
| DataType::FixedSizeList(_, _)
2228-
| DataType::LargeList(_) => {
2229-
list_columns.push((
2230-
index,
2231-
ColumnUnnestList {
2232-
output_column: Column::from_name(
2233-
&column_to_unnest.name,
2234-
),
2235-
depth: 1,
2236-
},
2237-
));
2238-
}
2239-
_ => {}
2240-
};
2241-
}
2242-
2243-
// new columns dependent on the same original index
2244-
dependency_indices
2245-
.extend(std::iter::repeat_n(index, transformed_columns.len()));
2246-
Ok(transformed_columns
2247-
.iter()
2248-
.map(|(col, field)| (col.relation.to_owned(), field.to_owned()))
2249-
.collect())
2250-
}
2251-
None => {
2252-
dependency_indices.push(index);
2253-
Ok(vec![(
2254-
original_qualifier.cloned(),
2255-
Arc::clone(original_field),
2256-
)])
2257-
}
2258-
}
2259-
})
2260-
.collect::<Result<Vec<_>>>()?
2261-
.into_iter()
2262-
.flatten()
2263-
.collect::<Vec<_>>();
2264-
2265-
let metadata = input_schema.metadata().clone();
2266-
let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
2267-
// We can use the existing functional dependencies:
2268-
let deps = input_schema.functional_dependencies().clone();
2269-
let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
2270-
2271-
Ok(LogicalPlan::Unnest(Unnest {
2272-
input: Arc::new(input),
2273-
exec_columns: columns_to_unnest,
2274-
list_type_columns: list_columns,
2275-
struct_type_columns: struct_columns,
2276-
dependency_indices,
2277-
schema,
2091+
Ok(LogicalPlan::Unnest(Unnest::try_new(
2092+
Arc::new(input),
2093+
columns_to_unnest,
22782094
options,
2279-
}))
2095+
)?))
22802096
}
22812097

22822098
#[cfg(test)]

0 commit comments

Comments
 (0)