Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 145 additions & 56 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

use std::collections::{HashMap, HashSet};
use std::ops::Not;
use std::sync::Arc;

use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StructArray};
Expand All @@ -27,8 +26,6 @@ use super::delete_filter::{DeleteFilter, PosDelLoadAction};
use crate::arrow::delete_file_loader::BasicDeleteFileLoader;
use crate::arrow::{arrow_primitive_to_literal, arrow_schema_to_schema};
use crate::delete_vector::DeleteVector;
use crate::expr::Predicate::AlwaysTrue;
use crate::expr::{Predicate, Reference};
use crate::io::FileIO;
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
use crate::spec::{
Expand All @@ -38,6 +35,41 @@ use crate::spec::{
};
use crate::{Error, ErrorKind, Result};

/// A composite key for equality delete lookups. Each element corresponds to one
/// equality_id column. For single-column deletes this contains one element.
#[derive(Hash, Eq, PartialEq, Debug, Clone)]
pub(crate) struct EqDeleteKey(pub(crate) Vec<Option<Datum>>);

/// Bundles the hash set of delete keys with the field metadata needed to extract
/// matching keys from data record batches.
#[derive(Debug, Clone)]
pub(crate) struct EqDeleteSet {
/// Delete key tuples to filter out of data batches.
pub(crate) keys: HashSet<EqDeleteKey>,
/// Ordered list of (field_name, field_id) used to locate the key columns in
/// data record batches. The order matches the element order in `EqDeleteKey`.
pub(crate) fields: Vec<(String, i32)>,
}

impl EqDeleteSet {
fn new(fields: Vec<(String, i32)>) -> Self {
Self {
keys: HashSet::new(),
fields,
}
}

/// Returns true when the set contains no delete keys.
pub(crate) fn is_empty(&self) -> bool {
self.keys.is_empty()
}

/// Merge another set (with the same field layout) into this one.
pub(crate) fn union(&mut self, other: &EqDeleteSet) {
self.keys.extend(other.keys.iter().cloned());
}
}

#[derive(Clone, Debug)]
pub(crate) struct CachingDeleteFileLoader {
basic_delete_file_loader: BasicDeleteFileLoader,
Expand All @@ -59,7 +91,7 @@ enum DeleteFileContext {
FreshEqDel {
batch_stream: ArrowRecordBatchStream,
equality_ids: HashSet<i32>,
sender: tokio::sync::oneshot::Sender<Predicate>,
sender: tokio::sync::oneshot::Sender<Arc<EqDeleteSet>>,
},
}

Expand Down Expand Up @@ -99,16 +131,17 @@ impl CachingDeleteFileLoader {
/// another concurrently processing data file scan task. If it is, we skip it.
/// If not, the DeleteFilter is updated to contain a notifier to prevent other data file
/// tasks from starting to load the same equality delete file. We spawn a task to load
/// the EQ delete's record batch stream, convert it to a predicate, update the delete filter,
/// and notify any task that was waiting for it.
/// the EQ delete's record batch stream, convert it to an `EqDeleteSet` (hash set of
/// delete key tuples), update the delete filter, and notify any task that was waiting
/// for it.
/// * When this gets updated to add support for delete vectors, the load phase will return
/// a PuffinReader for them.
/// * The parse phase parses each record batch stream according to its associated data type.
/// The result of this is a map of data file paths to delete vectors for the positional
/// delete tasks (and in future for the delete vector tasks). For equality delete
/// file tasks, this results in an unbound Predicate.
/// * The unbound Predicates resulting from equality deletes are sent to their associated oneshot
/// channel to store them in the right place in the delete file managers state.
/// file tasks, this results in an `EqDeleteSet` (hash set of delete key tuples).
/// * The `EqDeleteSet`s resulting from equality deletes are sent to their associated oneshot
/// channel to store them in the right place in the delete file manager's state.
/// * The results of all of these futures are awaited on in parallel with the specified
/// level of concurrency and collected into a vec. We then combine all the delete
/// vector maps that resulted from any positional delete or delete vector files into a
Expand All @@ -130,7 +163,7 @@ impl CachingDeleteFileLoader {
/// Pos Del Del Vec (Not yet Implemented) EQ Del
/// | | |
/// [parse pos del stream] [parse del vec puffin] [parse eq del]
/// HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap> (Predicate, Sender)
/// HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap> (EqDeleteSet, Sender)
/// | | |
/// | | [persist to state]
/// | | ()
Expand Down Expand Up @@ -249,9 +282,17 @@ impl CachingDeleteFileLoader {
let (sender, receiver) = channel();
del_filter.insert_equality_delete(&task.file_path, receiver);

// Per the Iceberg spec, evolve schema for equality deletes but only for the
// equality_ids columns, not all table columns.
let equality_ids_vec = task.equality_ids.clone().unwrap();
// Per the Iceberg spec, equality_ids is required for equality delete files.
// Evolve schema only for the equality_ids columns, not all table columns.
let equality_ids_vec = task.equality_ids.clone().ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!(
"equality_ids is required for equality delete file '{}' but was not set",
task.file_path
),
)
})?;
let evolved_stream = BasicDeleteFileLoader::evolve_schema(
basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes)
Expand Down Expand Up @@ -293,16 +334,16 @@ impl CachingDeleteFileLoader {
batch_stream,
equality_ids,
} => {
let predicate =
let eq_delete_set =
Self::parse_equality_deletes_record_batch_stream(batch_stream, equality_ids)
.await?;

sender
.send(predicate)
.send(Arc::new(eq_delete_set))
.map_err(|err| {
Error::new(
ErrorKind::Unexpected,
"Could not send eq delete predicate to state",
"Could not send eq delete set to state",
)
})
.map(|_| ParsedDeleteFileContext::EqDel)
Expand Down Expand Up @@ -354,19 +395,23 @@ impl CachingDeleteFileLoader {
Ok(result)
}

/// Parses equality delete record batches into a hash-based delete set.
///
/// We collect delete key tuples into a `HashSet` for O(1) per-row lookups.
async fn parse_equality_deletes_record_batch_stream(
mut stream: ArrowRecordBatchStream,
equality_ids: HashSet<i32>,
) -> Result<Predicate> {
let mut row_predicates = Vec::new();
) -> Result<EqDeleteSet> {
let mut batch_schema_iceberg: Option<Schema> = None;
let accessor = EqDelRecordBatchPartnerAccessor;
// Discover field metadata from the first non-empty batch.
let mut eq_delete_set: Option<EqDeleteSet> = None;

while let Some(record_batch) = stream.next().await {
let record_batch = record_batch?;

if record_batch.num_columns() == 0 {
return Ok(AlwaysTrue);
return Ok(EqDeleteSet::new(Vec::new()));
}

let schema = match &batch_schema_iceberg {
Expand All @@ -388,49 +433,37 @@ impl CachingDeleteFileLoader {
continue;
}

// Process the collected columns in lockstep
// Lazily initialize the EqDeleteSet with field metadata from the
// first batch that has columns. The field order is stable across
// batches because it comes from the delete file schema.
let delete_set = eq_delete_set.get_or_insert_with(|| {
let fields = datum_columns_with_names
.iter()
.map(|(_, name, field_id)| (name.clone(), *field_id))
.collect();
EqDeleteSet::new(fields)
});

// Collect delete key tuples by iterating all columns in lockstep.
#[allow(clippy::len_zero)]
while datum_columns_with_names[0].0.len() > 0 {
let mut row_predicate = AlwaysTrue;
for &mut (ref mut column, ref field_name) in &mut datum_columns_with_names {
let mut key_values = Vec::with_capacity(datum_columns_with_names.len());
for (column, _, _) in &mut datum_columns_with_names {
if let Some(item) = column.next() {
let cell_predicate = if let Some(datum) = item? {
Reference::new(field_name.clone()).equal_to(datum.clone())
} else {
Reference::new(field_name.clone()).is_null()
};
row_predicate = row_predicate.and(cell_predicate)
key_values.push(item?);
}
}
row_predicates.push(row_predicate.not().rewrite_not());
delete_set.keys.insert(EqDeleteKey(key_values));
}
}

// All row predicates are combined to a single predicate by creating a balanced binary tree.
// Using a simple fold would result in a deeply nested predicate that can cause a stack overflow.
while row_predicates.len() > 1 {
let mut next_level = Vec::with_capacity(row_predicates.len().div_ceil(2));
let mut iter = row_predicates.into_iter();
while let Some(p1) = iter.next() {
if let Some(p2) = iter.next() {
next_level.push(p1.and(p2));
} else {
next_level.push(p1);
}
}
row_predicates = next_level;
}

match row_predicates.pop() {
Some(p) => Ok(p),
None => Ok(AlwaysTrue),
}
Ok(eq_delete_set.unwrap_or_else(|| EqDeleteSet::new(Vec::new())))
}
}

struct EqDelColumnProcessor<'a> {
equality_ids: &'a HashSet<i32>,
collected_columns: Vec<(ArrayRef, String, Type)>,
collected_columns: Vec<(ArrayRef, String, i32, Type)>,
}

impl<'a> EqDelColumnProcessor<'a> {
Expand All @@ -441,18 +474,20 @@ impl<'a> EqDelColumnProcessor<'a> {
}
}

/// Produces per-column Datum iterators alongside (field_name, field_id) metadata.
#[allow(clippy::type_complexity)]
fn finish(
self,
) -> Result<
Vec<(
Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>>>,
String,
i32,
)>,
> {
self.collected_columns
.into_iter()
.map(|(array, field_name, field_type)| {
.map(|(array, field_name, field_id, field_type)| {
let primitive_type = field_type
.as_primitive_type()
.ok_or_else(|| {
Expand All @@ -477,7 +512,7 @@ impl<'a> EqDelColumnProcessor<'a> {
.transpose()
}));

Ok((datum_iterator, field_name))
Ok((datum_iterator, field_name, field_id))
})
.collect::<Result<Vec<_>>>()
}
Expand All @@ -495,6 +530,7 @@ impl SchemaWithPartnerVisitor<ArrayRef> for EqDelColumnProcessor<'_> {
self.collected_columns.push((
partner.clone(),
field.name.clone(),
field.id,
field.field_type.as_ref().clone(),
));
}
Expand Down Expand Up @@ -629,11 +665,58 @@ mod tests {
)
.await
.expect("error parsing batch stream");
println!("{parsed_eq_delete}");

let expected = "(((((y != 1) OR (z != 100)) OR (a != \"HELP\")) OR (sa != 4)) OR (b != 62696E6172795F64617461)) AND (((((y != 2) OR (z IS NOT NULL)) OR (a IS NOT NULL)) OR (sa != 5)) OR (b IS NOT NULL))".to_string();
// The delete file has 2 rows, so we expect 2 keys in the set
assert_eq!(parsed_eq_delete.keys.len(), 2);

// Field metadata should list the 5 equality columns (y, z, a, sa, b)
assert_eq!(parsed_eq_delete.fields.len(), 5);
let field_names: Vec<&str> = parsed_eq_delete
.fields
.iter()
.map(|(n, _)| n.as_str())
.collect();
assert!(field_names.contains(&"y"));
assert!(field_names.contains(&"z"));
assert!(field_names.contains(&"a"));
assert!(field_names.contains(&"sa"));
assert!(field_names.contains(&"b"));

// Row 1: y=1, z=100, a="HELP", sa=4, b=binary_data
let row1 = EqDeleteKey(vec![
Some(Datum::long(1)),
Some(Datum::long(100)),
Some(Datum::string("HELP")),
Some(Datum::int(4)),
Some(Datum::binary(b"binary_data".to_vec())),
]);
assert!(
parsed_eq_delete.keys.contains(&row1),
"Row 1 should be in delete set"
);

// Row 2: y=2, z=NULL, a=NULL, sa=5, b=NULL
let row2 = EqDeleteKey(vec![
Some(Datum::long(2)),
None,
None,
Some(Datum::int(5)),
None,
]);
assert!(
parsed_eq_delete.keys.contains(&row2),
"Row 2 should be in delete set"
);

assert_eq!(parsed_eq_delete.to_string(), expected);
// A non-existent key should not be in the set
let non_existent = EqDeleteKey(vec![
Some(Datum::long(999)),
Some(Datum::long(0)),
Some(Datum::string("NOPE")),
Some(Datum::int(0)),
Some(Datum::binary(b"nope".to_vec())),
]);
assert!(!parsed_eq_delete.keys.contains(&non_existent));
}

/// Create a simple field with metadata.
Expand Down Expand Up @@ -955,13 +1038,19 @@ mod tests {

// Verify both delete types can be processed together
let result = delete_filter
.build_equality_delete_predicate(&file_scan_task)
.build_equality_delete_sets(&file_scan_task)
.await;
assert!(
result.is_ok(),
"Failed to build equality delete predicate: {:?}",
"Failed to build equality delete sets: {:?}",
result.err()
);
// The equality delete sets should contain delete keys
let eq_sets = result.unwrap();
assert!(
!eq_sets.is_empty(),
"Expected at least one equality delete set"
);
}

#[tokio::test]
Expand Down
Loading
Loading