Skip to content

Commit cdc0151

Browse files
authored
fix: Column extraction for visitors should not rely on schema order (delta-io#1818)
## What changes are proposed in this pull request? Before this change, there was an assumption that columns would be extracted in the order they appear in the schema. This is an easy prerequisite to mess up and this code is typically not on the hot path, so we can afford to be more robust. According to the [trait docs](https://github.com/delta-io/delta-kernel-rs/blob/main/kernel/src/engine_data.rs#L294): ``` /// The names and types of leaf fields this visitor accesses. The `EngineData` being visited /// validates these types when extracting column getters, and [`RowVisitor::visit`] will receive /// one getter for each selected field, in the requested order. ``` ## How was this change tested? Added additional unit test to demonstrate the issue.
1 parent 09b026f commit cdc0151

1 file changed

Lines changed: 213 additions & 28 deletions

File tree

kernel/src/engine/arrow_data.rs

Lines changed: 213 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::{HashMap, HashSet};
1+
use std::collections::HashMap;
22
use std::sync::Arc;
33

44
use itertools::Itertools;
@@ -216,6 +216,16 @@ impl ProvidesColumnsAndFields for StructArray {
216216
}
217217
}
218218

219+
/// Tracks the state of a column during extraction
220+
enum ColumnState<'a> {
221+
/// Parent path used for traversal into nested structs
222+
Parent,
223+
/// Leaf column awaiting a getter to be extracted
224+
AwaitingGetter(&'a DataType),
225+
/// Leaf column with getter successfully extracted
226+
HasGetter(&'a dyn GetData<'a>),
227+
}
228+
219229
impl EngineData for ArrowEngineData {
220230
fn len(&self) -> usize {
221231
self.data.num_rows()
@@ -237,20 +247,48 @@ impl EngineData for ArrowEngineData {
237247
.with_backtrace());
238248
}
239249

240-
// Collect the names of all leaf columns we want to extract, along with their parents, to
241-
// guide our depth-first extraction. If the list contains any non-leaf, duplicate, or
242-
// missing column references, the extracted column list will be too short (error out below).
243-
let mask_capacity: usize = leaf_columns.iter().map(|c| c.len()).sum();
244-
let mut mask = HashSet::with_capacity(mask_capacity);
245-
for column in leaf_columns {
246-
for i in 0..column.len() {
247-
mask.insert(&column[..i + 1]);
250+
// Build a map tracking the state of each column path:
251+
// - Parent: used for traversal into nested structs
252+
// - AwaitingGetter: leaf column that needs a getter extracted
253+
// - HasGetter: leaf column with getter successfully extracted (set during extraction)
254+
//
255+
// This is used to guide our depth-first extraction. If the list contains any non-leaf,
256+
// duplicate, or missing column references, the extracted column list will be too
257+
// short (error out below).
258+
let mut column_map = HashMap::new();
259+
260+
for (column, data_type) in leaf_columns.iter().zip(leaf_types.iter()) {
261+
column_map.insert(column.clone(), ColumnState::AwaitingGetter(data_type));
262+
let mut cur_parent = column.parent();
263+
while let Some(parent) = cur_parent {
264+
column_map
265+
.entry(parent.clone())
266+
.or_insert(ColumnState::Parent);
267+
cur_parent = parent.parent();
248268
}
249269
}
250-
debug!("Column mask for selected columns {leaf_columns:?} is {mask:#?}");
270+
debug!(
271+
"Column map for selected columns {leaf_columns:?} has {} entries",
272+
column_map.len()
273+
);
274+
275+
// Extract all columns, transitioning AwaitingGetter -> HasGetter
276+
Self::extract_columns(&mut vec![], &mut column_map, &self.data)?;
251277

278+
// Extract getters in the requested column order, verifying state transitions
252279
let mut getters = Vec::with_capacity(leaf_columns.len());
253-
Self::extract_columns(&mut vec![], &mut getters, leaf_types, &mask, &self.data)?;
280+
for column in leaf_columns {
281+
match column_map.get(column.as_ref()) {
282+
Some(ColumnState::HasGetter(getter)) => getters.push(*getter),
283+
_ => {
284+
return Err(Error::MissingColumn(format!(
285+
"Column {} not found in the data",
286+
column
287+
)));
288+
}
289+
}
290+
}
291+
254292
if getters.len() != leaf_columns.len() {
255293
return Err(Error::MissingColumn(format!(
256294
"Visitor expected {} leaf columns, but only {} were found in the data",
@@ -298,27 +336,41 @@ impl EngineData for ArrowEngineData {
298336
impl ArrowEngineData {
299337
fn extract_columns<'a>(
300338
path: &mut Vec<String>,
301-
getters: &mut Vec<&'a dyn GetData<'a>>,
302-
leaf_types: &[DataType],
303-
column_mask: &HashSet<&[String]>,
339+
column_map: &mut HashMap<ColumnName, ColumnState<'a>>,
304340
data: &'a dyn ProvidesColumnsAndFields,
305341
) -> DeltaResult<()> {
306342
for (column, field) in data.columns().iter().zip(data.fields()) {
307343
path.push(field.name().to_string());
308-
if column_mask.contains(&path[..]) {
309-
if let Some(struct_array) = column.as_struct_opt() {
310-
debug!(
311-
"Recurse into a struct array for {}",
312-
ColumnName::new(path.iter())
313-
);
314-
Self::extract_columns(path, getters, leaf_types, column_mask, struct_array)?;
315-
} else if column.data_type() == &ArrowDataType::Null {
316-
debug!("Pushing a null array for {}", ColumnName::new(path.iter()));
317-
getters.push(&());
318-
} else {
319-
let data_type = &leaf_types[getters.len()];
320-
let getter = Self::extract_leaf_column(path, data_type, column)?;
321-
getters.push(getter);
344+
345+
// Check if this path is in our column map and mutate state if needed
346+
if let Some(state) = column_map.get_mut(path.as_slice()) {
347+
match state {
348+
ColumnState::Parent => {
349+
// Parent path - recurse if it's a struct
350+
if let Some(struct_array) = column.as_struct_opt() {
351+
debug!(
352+
"Recurse into a struct array for {}",
353+
ColumnName::new(path.iter())
354+
);
355+
Self::extract_columns(path, column_map, struct_array)?;
356+
}
357+
}
358+
ColumnState::AwaitingGetter(data_type) => {
359+
// Leaf column - extract and transition to HasGetter
360+
let getter = if column.data_type() == &ArrowDataType::Null {
361+
debug!("Pushing a null array for {}", ColumnName::new(path.iter()));
362+
&() as &'a dyn GetData<'a>
363+
} else {
364+
Self::extract_leaf_column(path, data_type, column)?
365+
};
366+
*state = ColumnState::HasGetter(getter);
367+
}
368+
ColumnState::HasGetter(_) => {
369+
return Err(Error::internal_error(format!(
370+
"Column {} already has a getter - duplicate column?",
371+
ColumnName::new(path.iter())
372+
)));
373+
}
322374
}
323375
} else {
324376
debug!("Skipping unmasked path {}", ColumnName::new(path.iter()));
@@ -978,6 +1030,139 @@ mod tests {
9781030
Ok(())
9791031
}
9801032

1033+
#[test]
1034+
fn test_column_ordering_independence() -> DeltaResult<()> {
1035+
use crate::arrow::array::StructArray;
1036+
use crate::engine_data::{GetData, TypedGetData};
1037+
use crate::schema::ColumnName;
1038+
1039+
// Schema: field_a, field_b, nested.x, nested.y
1040+
let nested_fields = vec![
1041+
ArrowField::new("x", ArrowDataType::Int32, false),
1042+
ArrowField::new("y", ArrowDataType::Int32, false),
1043+
];
1044+
let batch = RecordBatch::try_new(
1045+
Arc::new(ArrowSchema::new(vec![
1046+
ArrowField::new("field_a", ArrowDataType::Int32, false),
1047+
ArrowField::new("field_b", ArrowDataType::Int32, false),
1048+
ArrowField::new(
1049+
"nested",
1050+
ArrowDataType::Struct(nested_fields.clone().into()),
1051+
false,
1052+
),
1053+
])),
1054+
vec![
1055+
Arc::new(Int32Array::from(vec![1, 2])),
1056+
Arc::new(Int32Array::from(vec![10, 20])),
1057+
Arc::new(StructArray::try_new(
1058+
nested_fields.into(),
1059+
vec![
1060+
Arc::new(Int32Array::from(vec![100, 200])),
1061+
Arc::new(Int32Array::from(vec![1000, 2000])),
1062+
],
1063+
None,
1064+
)?),
1065+
],
1066+
)?;
1067+
1068+
// Column names requested in reverse order (not schema order)
1069+
use std::sync::LazyLock;
1070+
static REQUESTED_COLUMNS: LazyLock<Vec<ColumnName>> = LazyLock::new(|| {
1071+
vec![
1072+
ColumnName::new(["nested", "y"]),
1073+
ColumnName::new(["field_b"]),
1074+
ColumnName::new(["nested", "x"]),
1075+
ColumnName::new(["field_a"]),
1076+
]
1077+
});
1078+
1079+
struct Visitor {
1080+
values: Vec<(i32, i32, i32, i32)>,
1081+
}
1082+
impl crate::engine_data::RowVisitor for Visitor {
1083+
fn selected_column_names_and_types(
1084+
&self,
1085+
) -> (&'static [ColumnName], &'static [DataType]) {
1086+
use std::sync::LazyLock;
1087+
static TYPES: LazyLock<Vec<DataType>> =
1088+
LazyLock::new(|| vec![DataType::INTEGER; 4]);
1089+
(&REQUESTED_COLUMNS, &TYPES)
1090+
}
1091+
1092+
fn visit<'a>(
1093+
&mut self,
1094+
row_count: usize,
1095+
getters: &[&'a dyn GetData<'a>],
1096+
) -> DeltaResult<()> {
1097+
for i in 0..row_count {
1098+
self.values.push((
1099+
getters[0].get(i, "nested.y")?,
1100+
getters[1].get(i, "field_b")?,
1101+
getters[2].get(i, "nested.x")?,
1102+
getters[3].get(i, "field_a")?,
1103+
));
1104+
}
1105+
Ok(())
1106+
}
1107+
}
1108+
1109+
let mut visitor = Visitor { values: vec![] };
1110+
ArrowEngineData::new(batch).visit_rows(&REQUESTED_COLUMNS, &mut visitor)?;
1111+
1112+
// Verify values match requested order, not schema order
1113+
assert_eq!(visitor.values, vec![(1000, 10, 100, 1), (2000, 20, 200, 2)]);
1114+
Ok(())
1115+
}
1116+
1117+
#[test]
1118+
fn test_visit_duplicate_column_error() -> DeltaResult<()> {
1119+
use crate::engine_data::RowVisitor;
1120+
use crate::schema::ColumnName;
1121+
use std::sync::LazyLock;
1122+
1123+
// Create batch with simple columns
1124+
let batch = RecordBatch::try_new(
1125+
Arc::new(ArrowSchema::new(vec![
1126+
ArrowField::new("field_a", ArrowDataType::Int32, false),
1127+
ArrowField::new("field_a", ArrowDataType::Int32, false), // Duplicate column name
1128+
])),
1129+
vec![
1130+
Arc::new(Int32Array::from(vec![1, 2])),
1131+
Arc::new(Int32Array::from(vec![10, 20])),
1132+
],
1133+
)?;
1134+
1135+
// Request the duplicate column
1136+
static REQUESTED_COLUMNS: LazyLock<Vec<ColumnName>> =
1137+
LazyLock::new(|| vec![ColumnName::new(["field_a"])]);
1138+
1139+
struct DummyVisitor;
1140+
impl RowVisitor for DummyVisitor {
1141+
fn selected_column_names_and_types(
1142+
&self,
1143+
) -> (&'static [ColumnName], &'static [DataType]) {
1144+
static TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| vec![DataType::INTEGER]);
1145+
(&REQUESTED_COLUMNS, &TYPES)
1146+
}
1147+
fn visit<'a>(
1148+
&mut self,
1149+
_row_count: usize,
1150+
_getters: &[&'a dyn crate::engine_data::GetData<'a>],
1151+
) -> DeltaResult<()> {
1152+
Ok(())
1153+
}
1154+
}
1155+
1156+
let mut visitor = DummyVisitor;
1157+
let result = ArrowEngineData::new(batch).visit_rows(&REQUESTED_COLUMNS, &mut visitor);
1158+
1159+
assert_result_error_with_message(
1160+
result,
1161+
"Column field_a already has a getter - duplicate column?",
1162+
);
1163+
Ok(())
1164+
}
1165+
9811166
#[test]
9821167
fn test_run_array_out_of_bounds_errors() -> DeltaResult<()> {
9831168
// Test that out of bounds errors include field name for all types

0 commit comments

Comments
 (0)