Skip to content

Commit 29cf9f6

Browse files
committed
perf(reader): replace O(N*M) equality delete predicate tree with
O(N+M) HashSet-based filter The existing equality delete implementation builds a predicate AST with one node per delete record and evaluates every node against every data batch. For N delete records and M data rows, this is O(N*M) — 25 minutes for 30M rows with ~5000 delete records in our benchmark. Replace the predicate tree with a `HashSet<EqDeleteKey>` of delete key tuples, matching the approach used by the Java/Spark implementation (`StructLikeSet`). Delete records are collected into a hash set during parsing (O(N)), then each data row is checked with an O(1) lookup. Key changes: - `caching_delete_file_loader.rs`: `parse_equality_deletes_record_batch_stream` now returns an `EqDeleteSet` (hash set + field metadata) instead of a `Predicate`. The balanced binary tree construction and `rewrite_not()` calls are eliminated entirely. `Datum` already derives `Hash` and `Eq` (via `OrderedFloat` for floats), so no new trait implementations are needed. - `delete_filter.rs`: `EqDelState::Loaded` holds `Arc<EqDeleteSet>` instead of `Predicate`. The new `build_equality_delete_sets()` groups delete files by their `equality_ids` field layout before unioning, preventing incorrect merges when different delete files use different equality column sets. Single-file groups return the cached `Arc` directly with no deep clone. - `reader.rs`: Equality delete filtering is decoupled from the scan predicate `RowFilter`. The scan predicate stays in the Parquet `RowFilter` pipeline (page/row-group pruning preserved). Equality deletes are applied as a lazy post-read `.map()` step on the record batch stream via `apply_eq_delete_filter()`, which reuses a single `EqDeleteKey` allocation across all rows to avoid per-row `Vec` allocations.
1 parent fda82a2 commit 29cf9f6

3 files changed

Lines changed: 472 additions & 132 deletions

File tree

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 145 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
// under the License.
1717

1818
use std::collections::{HashMap, HashSet};
19-
use std::ops::Not;
2019
use std::sync::Arc;
2120

2221
use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StructArray};
@@ -27,8 +26,6 @@ use super::delete_filter::{DeleteFilter, PosDelLoadAction};
2726
use crate::arrow::delete_file_loader::BasicDeleteFileLoader;
2827
use crate::arrow::{arrow_primitive_to_literal, arrow_schema_to_schema};
2928
use crate::delete_vector::DeleteVector;
30-
use crate::expr::Predicate::AlwaysTrue;
31-
use crate::expr::{Predicate, Reference};
3229
use crate::io::FileIO;
3330
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
3431
use crate::spec::{
@@ -38,6 +35,41 @@ use crate::spec::{
3835
};
3936
use crate::{Error, ErrorKind, Result};
4037

38+
/// A composite key for equality delete lookups. Each element corresponds to one
39+
/// equality_id column. For single-column deletes this contains one element.
40+
#[derive(Hash, Eq, PartialEq, Debug, Clone)]
41+
pub(crate) struct EqDeleteKey(pub(crate) Vec<Option<Datum>>);
42+
43+
/// Bundles the hash set of delete keys with the field metadata needed to extract
44+
/// matching keys from data record batches.
45+
#[derive(Debug, Clone)]
46+
pub(crate) struct EqDeleteSet {
47+
/// Delete key tuples to filter out of data batches.
48+
pub(crate) keys: HashSet<EqDeleteKey>,
49+
/// Ordered list of (field_name, field_id) used to locate the key columns in
50+
/// data record batches. The order matches the element order in `EqDeleteKey`.
51+
pub(crate) fields: Vec<(String, i32)>,
52+
}
53+
54+
impl EqDeleteSet {
55+
fn new(fields: Vec<(String, i32)>) -> Self {
56+
Self {
57+
keys: HashSet::new(),
58+
fields,
59+
}
60+
}
61+
62+
/// Returns true when the set contains no delete keys.
63+
pub(crate) fn is_empty(&self) -> bool {
64+
self.keys.is_empty()
65+
}
66+
67+
/// Merge another set (with the same field layout) into this one.
68+
pub(crate) fn union(&mut self, other: &EqDeleteSet) {
69+
self.keys.extend(other.keys.iter().cloned());
70+
}
71+
}
72+
4173
#[derive(Clone, Debug)]
4274
pub(crate) struct CachingDeleteFileLoader {
4375
basic_delete_file_loader: BasicDeleteFileLoader,
@@ -59,7 +91,7 @@ enum DeleteFileContext {
5991
FreshEqDel {
6092
batch_stream: ArrowRecordBatchStream,
6193
equality_ids: HashSet<i32>,
62-
sender: tokio::sync::oneshot::Sender<Predicate>,
94+
sender: tokio::sync::oneshot::Sender<Arc<EqDeleteSet>>,
6395
},
6496
}
6597

@@ -99,16 +131,17 @@ impl CachingDeleteFileLoader {
99131
/// another concurrently processing data file scan task. If it is, we skip it.
100132
/// If not, the DeleteFilter is updated to contain a notifier to prevent other data file
101133
/// tasks from starting to load the same equality delete file. We spawn a task to load
102-
/// the EQ delete's record batch stream, convert it to a predicate, update the delete filter,
103-
/// and notify any task that was waiting for it.
134+
/// the EQ delete's record batch stream, convert it to an `EqDeleteSet` (hash set of
135+
/// delete key tuples), update the delete filter, and notify any task that was waiting
136+
/// for it.
104137
/// * When this gets updated to add support for delete vectors, the load phase will return
105138
/// a PuffinReader for them.
106139
/// * The parse phase parses each record batch stream according to its associated data type.
107140
/// The result of this is a map of data file paths to delete vectors for the positional
108141
/// delete tasks (and in future for the delete vector tasks). For equality delete
109-
/// file tasks, this results in an unbound Predicate.
110-
/// * The unbound Predicates resulting from equality deletes are sent to their associated oneshot
111-
/// channel to store them in the right place in the delete file managers state.
142+
/// file tasks, this results in an `EqDeleteSet` (hash set of delete key tuples).
143+
/// * The `EqDeleteSet`s resulting from equality deletes are sent to their associated oneshot
144+
/// channel to store them in the right place in the delete file manager's state.
112145
/// * The results of all of these futures are awaited on in parallel with the specified
113146
/// level of concurrency and collected into a vec. We then combine all the delete
114147
/// vector maps that resulted from any positional delete or delete vector files into a
@@ -130,7 +163,7 @@ impl CachingDeleteFileLoader {
130163
/// Pos Del Del Vec (Not yet Implemented) EQ Del
131164
/// | | |
132165
/// [parse pos del stream] [parse del vec puffin] [parse eq del]
133-
/// HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap> (Predicate, Sender)
166+
/// HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap> (EqDeleteSet, Sender)
134167
/// | | |
135168
/// | | [persist to state]
136169
/// | | ()
@@ -249,9 +282,17 @@ impl CachingDeleteFileLoader {
249282
let (sender, receiver) = channel();
250283
del_filter.insert_equality_delete(&task.file_path, receiver);
251284

252-
// Per the Iceberg spec, evolve schema for equality deletes but only for the
253-
// equality_ids columns, not all table columns.
254-
let equality_ids_vec = task.equality_ids.clone().unwrap();
285+
// Per the Iceberg spec, equality_ids is required for equality delete files.
286+
// Evolve schema only for the equality_ids columns, not all table columns.
287+
let equality_ids_vec = task.equality_ids.clone().ok_or_else(|| {
288+
Error::new(
289+
ErrorKind::DataInvalid,
290+
format!(
291+
"equality_ids is required for equality delete file '{}' but was not set",
292+
task.file_path
293+
),
294+
)
295+
})?;
255296
let evolved_stream = BasicDeleteFileLoader::evolve_schema(
256297
basic_delete_file_loader
257298
.parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes)
@@ -293,16 +334,16 @@ impl CachingDeleteFileLoader {
293334
batch_stream,
294335
equality_ids,
295336
} => {
296-
let predicate =
337+
let eq_delete_set =
297338
Self::parse_equality_deletes_record_batch_stream(batch_stream, equality_ids)
298339
.await?;
299340

300341
sender
301-
.send(predicate)
342+
.send(Arc::new(eq_delete_set))
302343
.map_err(|err| {
303344
Error::new(
304345
ErrorKind::Unexpected,
305-
"Could not send eq delete predicate to state",
346+
"Could not send eq delete set to state",
306347
)
307348
})
308349
.map(|_| ParsedDeleteFileContext::EqDel)
@@ -354,19 +395,23 @@ impl CachingDeleteFileLoader {
354395
Ok(result)
355396
}
356397

398+
/// Parses equality delete record batches into a hash-based delete set.
399+
///
400+
/// We collect delete key tuples into a `HashSet` for O(1) per-row lookups.
357401
async fn parse_equality_deletes_record_batch_stream(
358402
mut stream: ArrowRecordBatchStream,
359403
equality_ids: HashSet<i32>,
360-
) -> Result<Predicate> {
361-
let mut row_predicates = Vec::new();
404+
) -> Result<EqDeleteSet> {
362405
let mut batch_schema_iceberg: Option<Schema> = None;
363406
let accessor = EqDelRecordBatchPartnerAccessor;
407+
// Discover field metadata from the first non-empty batch.
408+
let mut eq_delete_set: Option<EqDeleteSet> = None;
364409

365410
while let Some(record_batch) = stream.next().await {
366411
let record_batch = record_batch?;
367412

368413
if record_batch.num_columns() == 0 {
369-
return Ok(AlwaysTrue);
414+
return Ok(EqDeleteSet::new(Vec::new()));
370415
}
371416

372417
let schema = match &batch_schema_iceberg {
@@ -388,49 +433,37 @@ impl CachingDeleteFileLoader {
388433
continue;
389434
}
390435

391-
// Process the collected columns in lockstep
436+
// Lazily initialize the EqDeleteSet with field metadata from the
437+
// first batch that has columns. The field order is stable across
438+
// batches because it comes from the delete file schema.
439+
let delete_set = eq_delete_set.get_or_insert_with(|| {
440+
let fields = datum_columns_with_names
441+
.iter()
442+
.map(|(_, name, field_id)| (name.clone(), *field_id))
443+
.collect();
444+
EqDeleteSet::new(fields)
445+
});
446+
447+
// Collect delete key tuples by iterating all columns in lockstep.
392448
#[allow(clippy::len_zero)]
393449
while datum_columns_with_names[0].0.len() > 0 {
394-
let mut row_predicate = AlwaysTrue;
395-
for &mut (ref mut column, ref field_name) in &mut datum_columns_with_names {
450+
let mut key_values = Vec::with_capacity(datum_columns_with_names.len());
451+
for (column, _, _) in &mut datum_columns_with_names {
396452
if let Some(item) = column.next() {
397-
let cell_predicate = if let Some(datum) = item? {
398-
Reference::new(field_name.clone()).equal_to(datum.clone())
399-
} else {
400-
Reference::new(field_name.clone()).is_null()
401-
};
402-
row_predicate = row_predicate.and(cell_predicate)
453+
key_values.push(item?);
403454
}
404455
}
405-
row_predicates.push(row_predicate.not().rewrite_not());
456+
delete_set.keys.insert(EqDeleteKey(key_values));
406457
}
407458
}
408459

409-
// All row predicates are combined to a single predicate by creating a balanced binary tree.
410-
// Using a simple fold would result in a deeply nested predicate that can cause a stack overflow.
411-
while row_predicates.len() > 1 {
412-
let mut next_level = Vec::with_capacity(row_predicates.len().div_ceil(2));
413-
let mut iter = row_predicates.into_iter();
414-
while let Some(p1) = iter.next() {
415-
if let Some(p2) = iter.next() {
416-
next_level.push(p1.and(p2));
417-
} else {
418-
next_level.push(p1);
419-
}
420-
}
421-
row_predicates = next_level;
422-
}
423-
424-
match row_predicates.pop() {
425-
Some(p) => Ok(p),
426-
None => Ok(AlwaysTrue),
427-
}
460+
Ok(eq_delete_set.unwrap_or_else(|| EqDeleteSet::new(Vec::new())))
428461
}
429462
}
430463

431464
struct EqDelColumnProcessor<'a> {
432465
equality_ids: &'a HashSet<i32>,
433-
collected_columns: Vec<(ArrayRef, String, Type)>,
466+
collected_columns: Vec<(ArrayRef, String, i32, Type)>,
434467
}
435468

436469
impl<'a> EqDelColumnProcessor<'a> {
@@ -441,18 +474,20 @@ impl<'a> EqDelColumnProcessor<'a> {
441474
}
442475
}
443476

477+
/// Produces per-column Datum iterators alongside (field_name, field_id) metadata.
444478
#[allow(clippy::type_complexity)]
445479
fn finish(
446480
self,
447481
) -> Result<
448482
Vec<(
449483
Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>>>,
450484
String,
485+
i32,
451486
)>,
452487
> {
453488
self.collected_columns
454489
.into_iter()
455-
.map(|(array, field_name, field_type)| {
490+
.map(|(array, field_name, field_id, field_type)| {
456491
let primitive_type = field_type
457492
.as_primitive_type()
458493
.ok_or_else(|| {
@@ -477,7 +512,7 @@ impl<'a> EqDelColumnProcessor<'a> {
477512
.transpose()
478513
}));
479514

480-
Ok((datum_iterator, field_name))
515+
Ok((datum_iterator, field_name, field_id))
481516
})
482517
.collect::<Result<Vec<_>>>()
483518
}
@@ -495,6 +530,7 @@ impl SchemaWithPartnerVisitor<ArrayRef> for EqDelColumnProcessor<'_> {
495530
self.collected_columns.push((
496531
partner.clone(),
497532
field.name.clone(),
533+
field.id,
498534
field.field_type.as_ref().clone(),
499535
));
500536
}
@@ -629,11 +665,58 @@ mod tests {
629665
)
630666
.await
631667
.expect("error parsing batch stream");
632-
println!("{parsed_eq_delete}");
633668

634-
let expected = "(((((y != 1) OR (z != 100)) OR (a != \"HELP\")) OR (sa != 4)) OR (b != 62696E6172795F64617461)) AND (((((y != 2) OR (z IS NOT NULL)) OR (a IS NOT NULL)) OR (sa != 5)) OR (b IS NOT NULL))".to_string();
669+
// The delete file has 2 rows, so we expect 2 keys in the set
670+
assert_eq!(parsed_eq_delete.keys.len(), 2);
671+
672+
// Field metadata should list the 5 equality columns (y, z, a, sa, b)
673+
assert_eq!(parsed_eq_delete.fields.len(), 5);
674+
let field_names: Vec<&str> = parsed_eq_delete
675+
.fields
676+
.iter()
677+
.map(|(n, _)| n.as_str())
678+
.collect();
679+
assert!(field_names.contains(&"y"));
680+
assert!(field_names.contains(&"z"));
681+
assert!(field_names.contains(&"a"));
682+
assert!(field_names.contains(&"sa"));
683+
assert!(field_names.contains(&"b"));
684+
685+
// Row 1: y=1, z=100, a="HELP", sa=4, b=binary_data
686+
let row1 = EqDeleteKey(vec![
687+
Some(Datum::long(1)),
688+
Some(Datum::long(100)),
689+
Some(Datum::string("HELP")),
690+
Some(Datum::int(4)),
691+
Some(Datum::binary(b"binary_data".to_vec())),
692+
]);
693+
assert!(
694+
parsed_eq_delete.keys.contains(&row1),
695+
"Row 1 should be in delete set"
696+
);
697+
698+
// Row 2: y=2, z=NULL, a=NULL, sa=5, b=NULL
699+
let row2 = EqDeleteKey(vec![
700+
Some(Datum::long(2)),
701+
None,
702+
None,
703+
Some(Datum::int(5)),
704+
None,
705+
]);
706+
assert!(
707+
parsed_eq_delete.keys.contains(&row2),
708+
"Row 2 should be in delete set"
709+
);
635710

636-
assert_eq!(parsed_eq_delete.to_string(), expected);
711+
// A non-existent key should not be in the set
712+
let non_existent = EqDeleteKey(vec![
713+
Some(Datum::long(999)),
714+
Some(Datum::long(0)),
715+
Some(Datum::string("NOPE")),
716+
Some(Datum::int(0)),
717+
Some(Datum::binary(b"nope".to_vec())),
718+
]);
719+
assert!(!parsed_eq_delete.keys.contains(&non_existent));
637720
}
638721

639722
/// Create a simple field with metadata.
@@ -955,13 +1038,19 @@ mod tests {
9551038

9561039
// Verify both delete types can be processed together
9571040
let result = delete_filter
958-
.build_equality_delete_predicate(&file_scan_task)
1041+
.build_equality_delete_sets(&file_scan_task)
9591042
.await;
9601043
assert!(
9611044
result.is_ok(),
962-
"Failed to build equality delete predicate: {:?}",
1045+
"Failed to build equality delete sets: {:?}",
9631046
result.err()
9641047
);
1048+
// The equality delete sets should contain delete keys
1049+
let eq_sets = result.unwrap();
1050+
assert!(
1051+
!eq_sets.is_empty(),
1052+
"Expected at least one equality delete set"
1053+
);
9651054
}
9661055

9671056
#[tokio::test]

0 commit comments

Comments
 (0)