diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index ae97534d83..f6393beae9 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -16,7 +16,6 @@ // under the License. use std::collections::{HashMap, HashSet}; -use std::ops::Not; use std::sync::Arc; use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StructArray}; @@ -27,8 +26,6 @@ use super::delete_filter::{DeleteFilter, PosDelLoadAction}; use crate::arrow::delete_file_loader::BasicDeleteFileLoader; use crate::arrow::{arrow_primitive_to_literal, arrow_schema_to_schema}; use crate::delete_vector::DeleteVector; -use crate::expr::Predicate::AlwaysTrue; -use crate::expr::{Predicate, Reference}; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; use crate::spec::{ @@ -38,6 +35,41 @@ use crate::spec::{ }; use crate::{Error, ErrorKind, Result}; +/// A composite key for equality delete lookups. Each element corresponds to one +/// equality_id column. For single-column deletes this contains one element. +#[derive(Hash, Eq, PartialEq, Debug, Clone)] +pub(crate) struct EqDeleteKey(pub(crate) Vec>); + +/// Bundles the hash set of delete keys with the field metadata needed to extract +/// matching keys from data record batches. +#[derive(Debug, Clone)] +pub(crate) struct EqDeleteSet { + /// Delete key tuples to filter out of data batches. + pub(crate) keys: HashSet, + /// Ordered list of (field_name, field_id) used to locate the key columns in + /// data record batches. The order matches the element order in `EqDeleteKey`. + pub(crate) fields: Vec<(String, i32)>, +} + +impl EqDeleteSet { + fn new(fields: Vec<(String, i32)>) -> Self { + Self { + keys: HashSet::new(), + fields, + } + } + + /// Returns true when the set contains no delete keys. + pub(crate) fn is_empty(&self) -> bool { + self.keys.is_empty() + } + + /// Merge another set (with the same field layout) into this one. + pub(crate) fn union(&mut self, other: &EqDeleteSet) { + self.keys.extend(other.keys.iter().cloned()); + } +} + #[derive(Clone, Debug)] pub(crate) struct CachingDeleteFileLoader { basic_delete_file_loader: BasicDeleteFileLoader, @@ -59,7 +91,7 @@ enum DeleteFileContext { FreshEqDel { batch_stream: ArrowRecordBatchStream, equality_ids: HashSet, - sender: tokio::sync::oneshot::Sender, + sender: tokio::sync::oneshot::Sender>, }, } @@ -99,16 +131,17 @@ impl CachingDeleteFileLoader { /// another concurrently processing data file scan task. If it is, we skip it. /// If not, the DeleteFilter is updated to contain a notifier to prevent other data file /// tasks from starting to load the same equality delete file. We spawn a task to load - /// the EQ delete's record batch stream, convert it to a predicate, update the delete filter, - /// and notify any task that was waiting for it. + /// the EQ delete's record batch stream, convert it to an `EqDeleteSet` (hash set of + /// delete key tuples), update the delete filter, and notify any task that was waiting + /// for it. /// * When this gets updated to add support for delete vectors, the load phase will return /// a PuffinReader for them. /// * The parse phase parses each record batch stream according to its associated data type. /// The result of this is a map of data file paths to delete vectors for the positional /// delete tasks (and in future for the delete vector tasks). For equality delete - /// file tasks, this results in an unbound Predicate. - /// * The unbound Predicates resulting from equality deletes are sent to their associated oneshot - /// channel to store them in the right place in the delete file managers state. + /// file tasks, this results in an `EqDeleteSet` (hash set of delete key tuples). + /// * The `EqDeleteSet`s resulting from equality deletes are sent to their associated oneshot + /// channel to store them in the right place in the delete file manager's state. /// * The results of all of these futures are awaited on in parallel with the specified /// level of concurrency and collected into a vec. We then combine all the delete /// vector maps that resulted from any positional delete or delete vector files into a @@ -130,7 +163,7 @@ impl CachingDeleteFileLoader { /// Pos Del Del Vec (Not yet Implemented) EQ Del /// | | | /// [parse pos del stream] [parse del vec puffin] [parse eq del] - /// HashMap HashMap (Predicate, Sender) + /// HashMap HashMap (EqDeleteSet, Sender) /// | | | /// | | [persist to state] /// | | () @@ -249,9 +282,17 @@ impl CachingDeleteFileLoader { let (sender, receiver) = channel(); del_filter.insert_equality_delete(&task.file_path, receiver); - // Per the Iceberg spec, evolve schema for equality deletes but only for the - // equality_ids columns, not all table columns. - let equality_ids_vec = task.equality_ids.clone().unwrap(); + // Per the Iceberg spec, equality_ids is required for equality delete files. + // Evolve schema only for the equality_ids columns, not all table columns. + let equality_ids_vec = task.equality_ids.clone().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "equality_ids is required for equality delete file '{}' but was not set", + task.file_path + ), + ) + })?; let evolved_stream = BasicDeleteFileLoader::evolve_schema( basic_delete_file_loader .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) @@ -293,16 +334,16 @@ impl CachingDeleteFileLoader { batch_stream, equality_ids, } => { - let predicate = + let eq_delete_set = Self::parse_equality_deletes_record_batch_stream(batch_stream, equality_ids) .await?; sender - .send(predicate) + .send(Arc::new(eq_delete_set)) .map_err(|err| { Error::new( ErrorKind::Unexpected, - "Could not send eq delete predicate to state", + "Could not send eq delete set to state", ) }) .map(|_| ParsedDeleteFileContext::EqDel) @@ -354,19 +395,23 @@ impl CachingDeleteFileLoader { Ok(result) } + /// Parses equality delete record batches into a hash-based delete set. + /// + /// We collect delete key tuples into a `HashSet` for O(1) per-row lookups. async fn parse_equality_deletes_record_batch_stream( mut stream: ArrowRecordBatchStream, equality_ids: HashSet, - ) -> Result { - let mut row_predicates = Vec::new(); + ) -> Result { let mut batch_schema_iceberg: Option = None; let accessor = EqDelRecordBatchPartnerAccessor; + // Discover field metadata from the first non-empty batch. + let mut eq_delete_set: Option = None; while let Some(record_batch) = stream.next().await { let record_batch = record_batch?; if record_batch.num_columns() == 0 { - return Ok(AlwaysTrue); + return Ok(EqDeleteSet::new(Vec::new())); } let schema = match &batch_schema_iceberg { @@ -388,49 +433,37 @@ impl CachingDeleteFileLoader { continue; } - // Process the collected columns in lockstep + // Lazily initialize the EqDeleteSet with field metadata from the + // first batch that has columns. The field order is stable across + // batches because it comes from the delete file schema. + let delete_set = eq_delete_set.get_or_insert_with(|| { + let fields = datum_columns_with_names + .iter() + .map(|(_, name, field_id)| (name.clone(), *field_id)) + .collect(); + EqDeleteSet::new(fields) + }); + + // Collect delete key tuples by iterating all columns in lockstep. #[allow(clippy::len_zero)] while datum_columns_with_names[0].0.len() > 0 { - let mut row_predicate = AlwaysTrue; - for &mut (ref mut column, ref field_name) in &mut datum_columns_with_names { + let mut key_values = Vec::with_capacity(datum_columns_with_names.len()); + for (column, _, _) in &mut datum_columns_with_names { if let Some(item) = column.next() { - let cell_predicate = if let Some(datum) = item? { - Reference::new(field_name.clone()).equal_to(datum.clone()) - } else { - Reference::new(field_name.clone()).is_null() - }; - row_predicate = row_predicate.and(cell_predicate) + key_values.push(item?); } } - row_predicates.push(row_predicate.not().rewrite_not()); + delete_set.keys.insert(EqDeleteKey(key_values)); } } - // All row predicates are combined to a single predicate by creating a balanced binary tree. - // Using a simple fold would result in a deeply nested predicate that can cause a stack overflow. - while row_predicates.len() > 1 { - let mut next_level = Vec::with_capacity(row_predicates.len().div_ceil(2)); - let mut iter = row_predicates.into_iter(); - while let Some(p1) = iter.next() { - if let Some(p2) = iter.next() { - next_level.push(p1.and(p2)); - } else { - next_level.push(p1); - } - } - row_predicates = next_level; - } - - match row_predicates.pop() { - Some(p) => Ok(p), - None => Ok(AlwaysTrue), - } + Ok(eq_delete_set.unwrap_or_else(|| EqDeleteSet::new(Vec::new()))) } } struct EqDelColumnProcessor<'a> { equality_ids: &'a HashSet, - collected_columns: Vec<(ArrayRef, String, Type)>, + collected_columns: Vec<(ArrayRef, String, i32, Type)>, } impl<'a> EqDelColumnProcessor<'a> { @@ -441,6 +474,7 @@ impl<'a> EqDelColumnProcessor<'a> { } } + /// Produces per-column Datum iterators alongside (field_name, field_id) metadata. #[allow(clippy::type_complexity)] fn finish( self, @@ -448,11 +482,12 @@ impl<'a> EqDelColumnProcessor<'a> { Vec<( Box>>>, String, + i32, )>, > { self.collected_columns .into_iter() - .map(|(array, field_name, field_type)| { + .map(|(array, field_name, field_id, field_type)| { let primitive_type = field_type .as_primitive_type() .ok_or_else(|| { @@ -477,7 +512,7 @@ impl<'a> EqDelColumnProcessor<'a> { .transpose() })); - Ok((datum_iterator, field_name)) + Ok((datum_iterator, field_name, field_id)) }) .collect::>>() } @@ -495,6 +530,7 @@ impl SchemaWithPartnerVisitor for EqDelColumnProcessor<'_> { self.collected_columns.push(( partner.clone(), field.name.clone(), + field.id, field.field_type.as_ref().clone(), )); } @@ -629,11 +665,58 @@ mod tests { ) .await .expect("error parsing batch stream"); - println!("{parsed_eq_delete}"); - let expected = "(((((y != 1) OR (z != 100)) OR (a != \"HELP\")) OR (sa != 4)) OR (b != 62696E6172795F64617461)) AND (((((y != 2) OR (z IS NOT NULL)) OR (a IS NOT NULL)) OR (sa != 5)) OR (b IS NOT NULL))".to_string(); + // The delete file has 2 rows, so we expect 2 keys in the set + assert_eq!(parsed_eq_delete.keys.len(), 2); + + // Field metadata should list the 5 equality columns (y, z, a, sa, b) + assert_eq!(parsed_eq_delete.fields.len(), 5); + let field_names: Vec<&str> = parsed_eq_delete + .fields + .iter() + .map(|(n, _)| n.as_str()) + .collect(); + assert!(field_names.contains(&"y")); + assert!(field_names.contains(&"z")); + assert!(field_names.contains(&"a")); + assert!(field_names.contains(&"sa")); + assert!(field_names.contains(&"b")); + + // Row 1: y=1, z=100, a="HELP", sa=4, b=binary_data + let row1 = EqDeleteKey(vec![ + Some(Datum::long(1)), + Some(Datum::long(100)), + Some(Datum::string("HELP")), + Some(Datum::int(4)), + Some(Datum::binary(b"binary_data".to_vec())), + ]); + assert!( + parsed_eq_delete.keys.contains(&row1), + "Row 1 should be in delete set" + ); + + // Row 2: y=2, z=NULL, a=NULL, sa=5, b=NULL + let row2 = EqDeleteKey(vec![ + Some(Datum::long(2)), + None, + None, + Some(Datum::int(5)), + None, + ]); + assert!( + parsed_eq_delete.keys.contains(&row2), + "Row 2 should be in delete set" + ); - assert_eq!(parsed_eq_delete.to_string(), expected); + // A non-existent key should not be in the set + let non_existent = EqDeleteKey(vec![ + Some(Datum::long(999)), + Some(Datum::long(0)), + Some(Datum::string("NOPE")), + Some(Datum::int(0)), + Some(Datum::binary(b"nope".to_vec())), + ]); + assert!(!parsed_eq_delete.keys.contains(&non_existent)); } /// Create a simple field with metadata. @@ -955,13 +1038,19 @@ mod tests { // Verify both delete types can be processed together let result = delete_filter - .build_equality_delete_predicate(&file_scan_task) + .build_equality_delete_sets(&file_scan_task) .await; assert!( result.is_ok(), - "Failed to build equality delete predicate: {:?}", + "Failed to build equality delete sets: {:?}", result.err() ); + // The equality delete sets should contain delete keys + let eq_sets = result.unwrap(); + assert!( + !eq_sets.is_empty(), + "Expected at least one equality delete set" + ); } #[tokio::test] diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 6369938ce2..8e0e002906 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -21,9 +21,8 @@ use std::sync::{Arc, Mutex, RwLock}; use tokio::sync::Notify; use tokio::sync::oneshot::Receiver; +use super::caching_delete_file_loader::EqDeleteSet; use crate::delete_vector::DeleteVector; -use crate::expr::Predicate::AlwaysTrue; -use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::scan::{FileScanTask, FileScanTaskDeleteFile}; use crate::spec::DataContentType; use crate::{Error, ErrorKind, Result}; @@ -31,7 +30,7 @@ use crate::{Error, ErrorKind, Result}; #[derive(Debug)] enum EqDelState { Loading(Arc), - Loaded(Predicate), + Loaded(Arc), } /// State tracking for positional delete files. @@ -148,17 +147,18 @@ impl DeleteFilter { } } - /// Retrieve the equality delete predicate for a given eq delete file path - pub(crate) async fn get_equality_delete_predicate_for_delete_file_path( + /// Retrieve the equality delete set for a given eq delete file path. + /// Waits asynchronously if the set is still being loaded. + pub(crate) async fn get_equality_delete_set_for_delete_file_path( &self, file_path: &str, - ) -> Option { + ) -> Option> { let notifier = { match self.state.read().unwrap().equality_deletes.get(file_path) { None => return None, Some(EqDelState::Loading(notifier)) => notifier.clone(), - Some(EqDelState::Loaded(predicate)) => { - return Some(predicate.clone()); + Some(EqDelState::Loaded(eq_delete_set)) => { + return Some(eq_delete_set.clone()); } } }; @@ -166,50 +166,73 @@ impl DeleteFilter { notifier.notified().await; match self.state.read().unwrap().equality_deletes.get(file_path) { - Some(EqDelState::Loaded(predicate)) => Some(predicate.clone()), + Some(EqDelState::Loaded(eq_delete_set)) => Some(eq_delete_set.clone()), _ => unreachable!("Cannot be any other state than loaded"), } } - /// Builds eq delete predicate for the provided task. - pub(crate) async fn build_equality_delete_predicate( + /// Builds equality delete sets for the provided task. + /// + /// Returns a list of delete sets, one per distinct `equality_ids` group. + /// Most tables use a single `equality_ids` set, so this typically returns + /// zero or one element. Multiple elements occur only when different delete + /// files on the same partition use different equality column sets. + /// + /// When only one delete file applies for a group, returns the cached `Arc` + /// directly — no deep clone of the hash set. + pub(crate) async fn build_equality_delete_sets( &self, file_scan_task: &FileScanTask, - ) -> Result> { - // * Filter the task's deletes into just the Equality deletes - // * Retrieve the unbound predicate for each from self.state.equality_deletes - // * Logical-AND them all together to get a single combined `Predicate` - // * Bind the predicate to the task's schema to get a `BoundPredicate` + ) -> Result>> { + // Collect all applicable equality delete sets, reusing cached Arcs. + // Group by field layout so we only union sets with matching columns. + let mut groups: HashMap, Vec>> = HashMap::new(); - let mut combined_predicate = AlwaysTrue; for delete in &file_scan_task.deletes { if !is_equality_delete(delete) { continue; } - let Some(predicate) = self - .get_equality_delete_predicate_for_delete_file_path(&delete.file_path) + let Some(eq_set) = self + .get_equality_delete_set_for_delete_file_path(&delete.file_path) .await else { return Err(Error::new( ErrorKind::Unexpected, format!( - "Missing predicate for equality delete file '{}'", + "Missing equality delete set for file '{}'", delete.file_path ), )); }; - combined_predicate = combined_predicate.and(predicate); + if !eq_set.is_empty() { + groups + .entry(eq_set.fields.clone()) + .or_default() + .push(eq_set); + } } - if combined_predicate == AlwaysTrue { - return Ok(None); + // For each group, union all sets into one. + let mut result = Vec::with_capacity(groups.len()); + for (_fields, sets) in groups { + match sets.len() { + 0 => {} + // Single file in group: return the cached Arc directly. + 1 => result.push(sets.into_iter().next().unwrap()), + // Multiple files with same fields: union into a new set. + _ => { + let mut combined = (*sets[0]).clone(); + for set in &sets[1..] { + combined.union(set); + } + result.push(Arc::new(combined)); + } + } } - let bound_predicate = combined_predicate - .bind(file_scan_task.schema.clone(), file_scan_task.case_sensitive)?; - Ok(Some(bound_predicate)) + Ok(result) } pub(crate) fn upsert_delete_vector( @@ -232,7 +255,7 @@ impl DeleteFilter { pub(crate) fn insert_equality_delete( &self, delete_file_path: &str, - eq_del: Receiver, + eq_del: Receiver>, ) { let notify = Arc::new(Notify::new()); { @@ -276,8 +299,9 @@ pub(crate) mod tests { use tempfile::TempDir; use super::*; - use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; - use crate::expr::Reference; + use crate::arrow::caching_delete_file_loader::{ + CachingDeleteFileLoader, EqDeleteKey, EqDeleteSet, + }; use crate::io::FileIO; use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, Type}; @@ -468,18 +492,17 @@ pub(crate) mod tests { } #[tokio::test] - async fn test_build_equality_delete_predicate_case_sensitive() { + async fn test_build_equality_delete_set_unions_multiple_files() { let schema = Arc::new( Schema::builder() .with_schema_id(1) .with_fields(vec![ - NestedField::required(1, "Id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), ]) .build() .unwrap(), ); - // ---------- fake FileScanTask ---------- let task = FileScanTask { file_size_in_bytes: 0, start: 0, @@ -488,15 +511,24 @@ pub(crate) mod tests { data_file_path: "data.parquet".to_string(), data_file_format: crate::spec::DataFileFormat::Parquet, schema: schema.clone(), - project_field_ids: vec![], + project_field_ids: vec![1], predicate: None, - deletes: vec![FileScanTaskDeleteFile { - file_path: "eq-del.parquet".to_string(), - file_size_in_bytes: 1, // never read; this test fails before opening the file - file_type: DataContentType::EqualityDeletes, - partition_spec_id: 0, - equality_ids: None, - }], + deletes: vec![ + FileScanTaskDeleteFile { + file_path: "eq-del-1.parquet".to_string(), + file_size_in_bytes: 1, + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![1]), + }, + FileScanTaskDeleteFile { + file_path: "eq-del-2.parquet".to_string(), + file_size_in_bytes: 1, + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![1]), + }, + ], partition: None, partition_spec: None, name_mapping: None, @@ -505,20 +537,148 @@ pub(crate) mod tests { let filter = DeleteFilter::default(); - // ---------- insert equality delete predicate ---------- - let pred = Reference::new("id").equal_to(Datum::long(10)); + // Insert two equality delete sets with different keys + let mut set1 = EqDeleteSet { + keys: std::collections::HashSet::new(), + fields: vec![("id".to_string(), 1)], + }; + set1.keys.insert(EqDeleteKey(vec![Some(Datum::long(10))])); + set1.keys.insert(EqDeleteKey(vec![Some(Datum::long(20))])); - let (tx, rx) = tokio::sync::oneshot::channel(); - filter.insert_equality_delete("eq-del.parquet", rx); + let mut set2 = EqDeleteSet { + keys: std::collections::HashSet::new(), + fields: vec![("id".to_string(), 1)], + }; + set2.keys.insert(EqDeleteKey(vec![Some(Datum::long(30))])); - tx.send(pred).unwrap(); + let (tx1, rx1) = tokio::sync::oneshot::channel(); + filter.insert_equality_delete("eq-del-1.parquet", rx1); + tx1.send(Arc::new(set1)).unwrap(); - // ---------- should FAIL ---------- - let result = filter.build_equality_delete_predicate(&task).await; + let (tx2, rx2) = tokio::sync::oneshot::channel(); + filter.insert_equality_delete("eq-del-2.parquet", rx2); + tx2.send(Arc::new(set2)).unwrap(); + // Small delay to allow the spawned tasks to complete + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + let result = filter.build_equality_delete_sets(&task).await; + assert!(result.is_ok()); + + let eq_sets = result.unwrap(); + // Same equality_ids → unioned into one set + assert_eq!(eq_sets.len(), 1); + let eq_set = &eq_sets[0]; + // Union of {10, 20} and {30} should contain all three + assert_eq!(eq_set.keys.len(), 3); + assert!( + eq_set + .keys + .contains(&EqDeleteKey(vec![Some(Datum::long(10))])) + ); + assert!( + eq_set + .keys + .contains(&EqDeleteKey(vec![Some(Datum::long(20))])) + ); assert!( - result.is_err(), - "case_sensitive=true should fail when column case mismatches" + eq_set + .keys + .contains(&EqDeleteKey(vec![Some(Datum::long(30))])) + ); + } + + /// Delete files with different equality_ids must NOT be unioned — they + /// produce separate sets, each applied independently. + #[tokio::test] + async fn test_build_equality_delete_sets_different_equality_ids() { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + let task = FileScanTask { + file_size_in_bytes: 0, + start: 0, + length: 0, + record_count: None, + data_file_path: "data.parquet".to_string(), + data_file_format: crate::spec::DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + deletes: vec![ + FileScanTaskDeleteFile { + file_path: "eq-del-by-id.parquet".to_string(), + file_size_in_bytes: 1, + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![1]), + }, + FileScanTaskDeleteFile { + file_path: "eq-del-by-name.parquet".to_string(), + file_size_in_bytes: 1, + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![2]), + }, + ], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: true, + }; + + let filter = DeleteFilter::default(); + + // Delete file 1: delete by id + let mut set_by_id = EqDeleteSet { + keys: std::collections::HashSet::new(), + fields: vec![("id".to_string(), 1)], + }; + set_by_id + .keys + .insert(EqDeleteKey(vec![Some(Datum::long(10))])); + + // Delete file 2: delete by name + let mut set_by_name = EqDeleteSet { + keys: std::collections::HashSet::new(), + fields: vec![("name".to_string(), 2)], + }; + set_by_name + .keys + .insert(EqDeleteKey(vec![Some(Datum::string("alice"))])); + + let (tx1, rx1) = tokio::sync::oneshot::channel(); + filter.insert_equality_delete("eq-del-by-id.parquet", rx1); + tx1.send(Arc::new(set_by_id)).unwrap(); + + let (tx2, rx2) = tokio::sync::oneshot::channel(); + filter.insert_equality_delete("eq-del-by-name.parquet", rx2); + tx2.send(Arc::new(set_by_name)).unwrap(); + + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + let eq_sets = filter + .build_equality_delete_sets(&task) + .await + .expect("should succeed"); + + // Different equality_ids → two separate sets, NOT unioned + assert_eq!( + eq_sets.len(), + 2, + "Delete files with different equality_ids must produce separate sets" ); + + // Each set should have exactly one key + let key_counts: Vec = eq_sets.iter().map(|s| s.keys.len()).collect(); + assert!(key_counts.contains(&1)); } } diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 488f41cf20..766f3aae45 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -17,7 +17,7 @@ //! Parquet file data reader -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::ops::Range; use std::str::FromStr; use std::sync::Arc; @@ -29,6 +29,7 @@ use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; use arrow_schema::{ ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; +use arrow_select::filter::filter_record_batch; use arrow_string::like::starts_with; use bytes::Bytes; use fnv::FnvHashSet; @@ -45,10 +46,10 @@ use parquet::file::metadata::{ use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; use typed_builder::TypedBuilder; -use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; +use crate::arrow::caching_delete_file_loader::{CachingDeleteFileLoader, EqDeleteKey, EqDeleteSet}; use crate::arrow::int96::coerce_int96_timestamps; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; -use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; +use crate::arrow::{arrow_primitive_to_literal, arrow_schema_to_schema, get_arrow_datum}; use crate::delete_vector::DeleteVector; use crate::error::Result; use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit}; @@ -58,7 +59,7 @@ use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; -use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; +use crate::spec::{DataContentType, Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; use crate::util::available_parallelism; use crate::{Error, ErrorKind}; @@ -420,12 +421,41 @@ impl ArrowReader { .copied() .collect(); - // Create projection mask based on field IDs - // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false) - // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match) - // - If fallback IDs: position-based projection (missing_field_ids=true) + // Collect equality delete key field IDs from the task's delete files. + // These may reference columns NOT in the user's projection. We must + // include them in the Parquet read so equality deletes can be applied, + // then strip them from the output batches afterward. + let eq_delete_key_field_ids: BTreeSet = task + .deletes + .iter() + .filter(|d| matches!(d.file_type, DataContentType::EqualityDeletes)) + .filter_map(|d| d.equality_ids.as_ref()) + .flatten() + .copied() + .collect(); + + // Augment the Parquet projection with any equality delete key columns + // that the user didn't request. Guard: when the user's projection is + // empty, ProjectionMask::all() reads all columns — no augmentation needed. + let augmented_field_ids: Vec = if !eq_delete_key_field_ids.is_empty() + && !project_field_ids_without_metadata.is_empty() + { + let user_set: HashSet = + project_field_ids_without_metadata.iter().copied().collect(); + let mut augmented = project_field_ids_without_metadata.clone(); + for &id in &eq_delete_key_field_ids { + if !user_set.contains(&id) && !is_metadata_field(id) { + augmented.push(id); + } + } + augmented + } else { + project_field_ids_without_metadata.clone() + }; + + // Create projection mask based on field IDs (augmented with eq delete keys) let projection_mask = Self::get_arrow_projection_mask( - &project_field_ids_without_metadata, + &augmented_field_ids, &task.schema, record_batch_stream_builder.parquet_schema(), record_batch_stream_builder.schema(), @@ -437,9 +467,25 @@ impl ArrowReader { // RecordBatchTransformer performs any transformations required on the RecordBatches // that come back from the file, such as type promotion, default column insertion, - // column re-ordering, partition constants, and virtual field addition (like _file) + // column re-ordering, partition constants, and virtual field addition (like _file). + // When equality delete key columns were added to the projection, the transformer + // must also know about them so it can apply type promotion correctly. + let transformer_field_ids: Vec = + if !eq_delete_key_field_ids.is_empty() && !task.project_field_ids.is_empty() { + let user_set: HashSet = task.project_field_ids.iter().copied().collect(); + let mut ids = task.project_field_ids.to_vec(); + for &id in &eq_delete_key_field_ids { + if !user_set.contains(&id) { + ids.push(id); + } + } + ids + } else { + task.project_field_ids.to_vec() + }; + let mut record_batch_transformer_builder = - RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids()); + RecordBatchTransformerBuilder::new(task.schema_ref(), &transformer_field_ids); // Add the _file metadata column if it's in the projected fields if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) { @@ -462,36 +508,21 @@ impl ArrowReader { } let delete_filter = delete_filter_rx.await.unwrap()?; - let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?; - - // In addition to the optional predicate supplied in the `FileScanTask`, - // we also have an optional predicate resulting from equality delete files. - // If both are present, we logical-AND them together to form a single filter - // predicate that we can pass to the `RecordBatchStreamBuilder`. - let final_predicate = match (&task.predicate, delete_predicate) { - (None, None) => None, - (Some(predicate), None) => Some(predicate.clone()), - (None, Some(ref predicate)) => Some(predicate.clone()), - (Some(filter_predicate), Some(delete_predicate)) => { - Some(filter_predicate.clone().and(delete_predicate)) - } - }; + let eq_delete_sets = delete_filter.build_equality_delete_sets(&task).await?; - // There are three possible sources for potential lists of selected RowGroup indices, - // and two for `RowSelection`s. - // Selected RowGroup index lists can come from three sources: + // The scan predicate (if any) is applied via the Parquet RowFilter. + // Equality deletes are applied as a separate post-read filter step using + // a HashSet for O(1) per-row lookups instead of O(N) predicate evaluation. + let final_predicate = task.predicate.clone(); + + // Selected RowGroup index lists can come from two sources: // * When task.start and task.length specify a byte range (file splitting); - // * When there are equality delete files that are applicable; // * When there is a scan predicate and row_group_filtering_enabled = true. // `RowSelection`s can be created in either or both of the following cases: // * When there are positional delete files that are applicable; // * When there is a scan predicate and row_selection_enabled = true - // Note that row group filtering from predicates only happens when - // there is a scan predicate AND row_group_filtering_enabled = true, - // but we perform row selection filtering if there are applicable - // equality delete files OR (there is a scan predicate AND row_selection_enabled), - // since the only implemented method of applying positional deletes is - // by using a `RowSelection`. + // Equality deletes are applied as a post-read hash-based filter (not via + // RowFilter or RowSelection) for O(1) per-row lookups. let mut selected_row_group_indices = None; let mut row_selection = None; @@ -600,7 +631,152 @@ impl ArrowReader { Err(err) => Err(err.into()), }); - Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) + // Apply equality delete filtering as a post-read step using hash-based + // lookups. This runs after decompression and the record batch transformer, + // checking each row against each delete set in O(1) per row. + // Multiple sets occur only when delete files use different equality_ids. + // + // If we augmented the projection with equality delete key columns that + // the user didn't request, strip those extra columns after applying + // deletes so the output schema matches the user's original projection. + let extra_eq_cols_to_strip = if !eq_delete_key_field_ids.is_empty() + && !task.project_field_ids.is_empty() + && eq_delete_key_field_ids + .iter() + .any(|id| !task.project_field_ids.contains(id)) + { + task.project_field_ids.len() + } else { + 0 + }; + + if eq_delete_sets.is_empty() { + if extra_eq_cols_to_strip > 0 { + let stripped = record_batch_stream.map(move |batch_result| { + Self::strip_extra_columns(batch_result?, extra_eq_cols_to_strip) + }); + Ok(Box::pin(stripped) as ArrowRecordBatchStream) + } else { + Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) + } + } else { + let filtered = record_batch_stream.map(move |batch_result| { + let mut batch = batch_result?; + for eq_delete_set in &eq_delete_sets { + batch = Self::apply_eq_delete_filter(&batch, eq_delete_set)?; + } + if extra_eq_cols_to_strip > 0 { + batch = Self::strip_extra_columns(batch, extra_eq_cols_to_strip)?; + } + Ok(batch) + }); + Ok(Box::pin(filtered) as ArrowRecordBatchStream) + } + } + + /// Filters a record batch by removing rows whose equality-delete key columns + /// match an entry in the delete set. Uses O(1) hash lookups per row. + fn apply_eq_delete_filter( + batch: &RecordBatch, + delete_set: &EqDeleteSet, + ) -> Result { + // Locate delete key columns in the batch by field_id (stored in Arrow + // field metadata under the "PARQUET:field_id" key). + // For each delete key field, locate the corresponding column in the batch + // and convert it to a Vec> for hash-based lookups. + let datum_columns: Vec>> = delete_set + .fields + .iter() + .map(|(field_name, field_id)| { + // Find the column by field_id in the batch schema metadata, + // falling back to name-based lookup. + let col = batch + .schema() + .fields() + .iter() + .enumerate() + .find_map(|(col_idx, field)| { + let id = field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY)? + .parse::() + .ok()?; + (id == *field_id).then(|| batch.column(col_idx)) + }) + .map(Ok) + .unwrap_or_else(|| { + batch.schema().index_of(field_name).map(|idx| batch.column(idx)).map_err(|_| { + Error::new( + ErrorKind::Unexpected, + format!( + "Equality delete key column '{}' (field_id={}) not found in batch", + field_name, field_id + ), + ) + }) + })?; + // Resolve the Iceberg PrimitiveType from the Arrow data type. + let iceberg_type = + crate::arrow::arrow_type_to_type(col.data_type())?; + let literals = arrow_primitive_to_literal(col, &iceberg_type)?; + // Convert Literal → Datum + let primitive_type = iceberg_type + .as_primitive_type() + .ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "field is not a primitive type") + })? + .clone(); + let datums = literals + .into_iter() + .map(|opt_lit| { + opt_lit + .and_then(|lit| lit.as_primitive_literal()) + .map(|prim_lit| Datum::new(primitive_type.clone(), prim_lit)) + }) + .collect::>(); + Ok(datums) + }) + .collect::>>()?; + + let num_rows = batch.num_rows(); + let num_cols = datum_columns.len(); + let mut keep = vec![true; num_rows]; + + // Reuse a single EqDeleteKey allocation across all rows to avoid + // per-row Vec allocation + clone. We swap in new values each iteration. + let mut probe_key = EqDeleteKey(vec![None; num_cols]); + + for row_idx in 0..num_rows { + for (col_idx, col) in datum_columns.iter().enumerate() { + probe_key.0[col_idx].clone_from(&col[row_idx]); + } + if delete_set.keys.contains(&probe_key) { + keep[row_idx] = false; + } + } + + let mask = BooleanArray::from(keep); + filter_record_batch(batch, &mask).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + format!("Failed to filter record batch: {e}"), + ) + }) + } + + /// Strips columns beyond `num_cols_to_keep` from the batch. + /// + /// Used to remove equality delete key columns that were added to the + /// projection solely for delete evaluation. The extra columns are always + /// appended at the end by the augmentation logic in `process_file_scan_task`. + fn strip_extra_columns(batch: RecordBatch, num_cols_to_keep: usize) -> Result { + let indices: Vec = (0..num_cols_to_keep).collect(); + batch.project(&indices).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + format!("stripping eq delete key columns: {e}"), + ) + }) } /// Opens a Parquet file and loads its metadata, returning both the reader and metadata. @@ -5464,4 +5640,359 @@ message schema { ts_array.value(0) ); } + + // ========================================================================= + // Equality delete + column projection tests + // + // Verify that equality deletes work correctly when the user's column + // projection does not include the equality delete key columns. + // ========================================================================= + + /// Helper: create a 3-column data file and an equality delete Parquet file. + /// + /// Data: 5 rows — id(1..=5), name(a..e), value(10,20,30,40,50). + /// Delete file schema uses the caller-provided Arrow schema + batches. + fn create_eq_delete_test_fixtures( + table_location: &str, + delete_batches: Vec, + ) -> (String, String, SchemaRef) { + use arrow_array::Int32Array; + + let table_schema: SchemaRef = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "value", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let data_arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + Field::new("value", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + ])); + + let data_file_path = format!("{table_location}/data.parquet"); + let data_batch = RecordBatch::try_new(data_arrow_schema.clone(), vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])), + Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])), + ]) + .unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let file = File::create(&data_file_path).unwrap(); + let mut writer = + ArrowWriter::try_new(file, data_arrow_schema.clone(), Some(props)).unwrap(); + writer.write(&data_batch).unwrap(); + writer.close().unwrap(); + + let delete_file_path = format!("{table_location}/eq_deletes.parquet"); + let delete_schema = delete_batches[0].schema(); + let delete_props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let delete_file = File::create(&delete_file_path).unwrap(); + let mut delete_writer = + ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap(); + for batch in &delete_batches { + delete_writer.write(batch).unwrap(); + } + delete_writer.close().unwrap(); + + (data_file_path, delete_file_path, table_schema) + } + + /// Helper: build a single-column delete batch for field "id" (field_id=1). + fn eq_delete_batch_for_id(ids: Vec) -> RecordBatch { + use arrow_array::Int32Array; + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(ids))]).unwrap() + } + + /// Test A: Projected scan EXCLUDES the equality delete key column. + #[tokio::test] + async fn test_eq_delete_projection_excludes_delete_key() { + let tmp_dir = TempDir::new().unwrap(); + let loc = tmp_dir.path().to_str().unwrap().to_string(); + + let (data_path, del_path, schema) = + create_eq_delete_test_fixtures(&loc, vec![eq_delete_batch_for_id(vec![3])]); + + let reader = ArrowReaderBuilder::new(FileIO::new_with_fs()).build(); + + let task = FileScanTask { + file_size_in_bytes: std::fs::metadata(&data_path).unwrap().len(), + start: 0, + length: 0, + record_count: Some(5), + data_file_path: data_path, + data_file_format: DataFileFormat::Parquet, + schema, + project_field_ids: vec![2, 3], // name, value — NOT id + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_size_in_bytes: std::fs::metadata(&del_path).unwrap().len(), + file_path: del_path, + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![1]), + }], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }; + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let result: Vec = reader.read(tasks).unwrap().try_collect().await.unwrap(); + + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 4, "Expected 4 rows after deleting id=3"); + + let out_schema = result[0].schema(); + let col_names: Vec<&str> = out_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + assert_eq!( + col_names, + vec!["name", "value"], + "Output must only contain projected columns" + ); + + let names: Vec = result + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|s| s.unwrap().to_string()) + .collect::>() + }) + .collect(); + assert_eq!(names, vec!["a", "b", "d", "e"]); + } + + /// Test B: Projected scan INCLUDES the equality delete key column. + #[tokio::test] + async fn test_eq_delete_projection_includes_delete_key() { + use arrow_array::Int32Array; + + let tmp_dir = TempDir::new().unwrap(); + let loc = tmp_dir.path().to_str().unwrap().to_string(); + + let (data_path, del_path, schema) = + create_eq_delete_test_fixtures(&loc, vec![eq_delete_batch_for_id(vec![3])]); + + let reader = ArrowReaderBuilder::new(FileIO::new_with_fs()).build(); + + let task = FileScanTask { + file_size_in_bytes: std::fs::metadata(&data_path).unwrap().len(), + start: 0, + length: 0, + record_count: Some(5), + data_file_path: data_path, + data_file_format: DataFileFormat::Parquet, + schema, + project_field_ids: vec![1, 2, 3], // id, name, value — includes delete key + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_size_in_bytes: std::fs::metadata(&del_path).unwrap().len(), + file_path: del_path, + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![1]), + }], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }; + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let result: Vec = reader.read(tasks).unwrap().try_collect().await.unwrap(); + + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 4); + + let out_schema = result[0].schema(); + let col_names: Vec<&str> = out_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + assert_eq!(col_names, vec!["id", "name", "value"]); + + let ids: Vec = result + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect::>() + }) + .collect(); + assert_eq!(ids, vec![1, 2, 4, 5]); + } + + /// Test C: Full scan (all fields projected) with equality deletes. + #[tokio::test] + async fn test_eq_delete_full_scan_no_projection() { + let tmp_dir = TempDir::new().unwrap(); + let loc = tmp_dir.path().to_str().unwrap().to_string(); + + let (data_path, del_path, schema) = + create_eq_delete_test_fixtures(&loc, vec![eq_delete_batch_for_id(vec![3])]); + + let reader = ArrowReaderBuilder::new(FileIO::new_with_fs()).build(); + + let task = FileScanTask { + file_size_in_bytes: std::fs::metadata(&data_path).unwrap().len(), + start: 0, + length: 0, + record_count: Some(5), + data_file_path: data_path, + data_file_format: DataFileFormat::Parquet, + schema, + project_field_ids: vec![1, 2, 3], + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_size_in_bytes: std::fs::metadata(&del_path).unwrap().len(), + file_path: del_path, + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![1]), + }], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }; + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let result: Vec = reader.read(tasks).unwrap().try_collect().await.unwrap(); + + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 4); + + let out_schema = result[0].schema(); + let col_names: Vec<&str> = out_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + assert_eq!(col_names, vec!["id", "name", "value"]); + } + + /// Test D: Multi-field equality delete, NEITHER key in projection. + #[tokio::test] + async fn test_eq_delete_multi_field_key_excluded_from_projection() { + use arrow_array::Int32Array; + + let tmp_dir = TempDir::new().unwrap(); + let loc = tmp_dir.path().to_str().unwrap().to_string(); + + // Delete keyed on BOTH id(1) AND name(2) + let del_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ])); + let del_batch = RecordBatch::try_new(del_schema, vec![ + Arc::new(Int32Array::from(vec![3])), + Arc::new(StringArray::from(vec!["c"])), + ]) + .unwrap(); + + let (data_path, del_path, schema) = create_eq_delete_test_fixtures(&loc, vec![del_batch]); + + let reader = ArrowReaderBuilder::new(FileIO::new_with_fs()).build(); + + let task = FileScanTask { + file_size_in_bytes: std::fs::metadata(&data_path).unwrap().len(), + start: 0, + length: 0, + record_count: Some(5), + data_file_path: data_path, + data_file_format: DataFileFormat::Parquet, + schema, + project_field_ids: vec![3], // only value — neither delete key + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_size_in_bytes: std::fs::metadata(&del_path).unwrap().len(), + file_path: del_path, + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![1, 2]), + }], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }; + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let result: Vec = reader.read(tasks).unwrap().try_collect().await.unwrap(); + + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 4, "Expected 4 rows after deleting id=3,name=c"); + + let out_schema = result[0].schema(); + let col_names: Vec<&str> = out_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + assert_eq!( + col_names, + vec!["value"], + "Output must only contain projected columns" + ); + + let values: Vec = result + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect::>() + }) + .collect(); + assert_eq!(values, vec![10, 20, 40, 50]); + } }