Skip to content

Commit 4c94e86

Browse files
authored
[incremental scan] Support for equality deletes (#60)
* [incremental scan] Support for equality deletes * Clippy fix * Refactor * . * Small optimization * Clippy fix * Fix deadlock * Add test * Code reuse * Add test * Cleanup * Some PR comments * Enable row group filtering * PR comments * Factor out common code * Clippy fix * Tighten method visibility * . * Unify some more * Format * . * Unify more * Unify more, add partition info to append task * . * Unify predicate application * . * Allow in tests for multiple eq columns * Add test * Remove rewrite * Add case_sensitive * . * Add helper * Format * Fix clippy * Some PR comments * Make big helper * Split up more * Inline small helpers * Reinsert comment * Narrow visibility * Remove mut * . * Format
1 parent c49f7e9 commit 4c94e86

7 files changed

Lines changed: 1737 additions & 394 deletions

File tree

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -204,22 +204,15 @@ impl DeleteFilter {
204204
}
205205
}
206206

207-
/// Builds eq delete predicate for the provided task.
208-
pub(crate) async fn build_equality_delete_predicate(
207+
/// Builds combined predicate from a list of equality delete files.
208+
/// Retrieves predicates for each file, combines them with AND logic.
209+
pub(crate) async fn build_combined_equality_delete_predicate(
209210
&self,
210-
file_scan_task: &FileScanTask,
211-
) -> Result<Option<BoundPredicate>> {
212-
// * Filter the task's deletes into just the Equality deletes
213-
// * Retrieve the unbound predicate for each from self.state.equality_deletes
214-
// * Logical-AND them all together to get a single combined `Predicate`
215-
// * Bind the predicate to the task's schema to get a `BoundPredicate`
216-
211+
equality_delete_files: &[FileScanTaskDeleteFile],
212+
) -> Result<Option<Predicate>> {
217213
let mut combined_predicate = AlwaysTrue;
218-
for delete in &file_scan_task.deletes {
219-
if !is_equality_delete(delete) {
220-
continue;
221-
}
222214

215+
for delete in equality_delete_files {
223216
let Some(predicate) = self
224217
.get_equality_delete_predicate_for_delete_file_path(&delete.file_path)
225218
.await
@@ -240,6 +233,33 @@ impl DeleteFilter {
240233
return Ok(None);
241234
}
242235

236+
Ok(Some(combined_predicate))
237+
}
238+
239+
/// Builds eq delete predicate for the provided task.
240+
pub(crate) async fn build_equality_delete_predicate(
241+
&self,
242+
file_scan_task: &FileScanTask,
243+
) -> Result<Option<BoundPredicate>> {
244+
// Filter the task's deletes into just the Equality deletes
245+
let equality_deletes: Vec<FileScanTaskDeleteFile> = file_scan_task
246+
.deletes
247+
.iter()
248+
.filter(|delete| is_equality_delete(delete))
249+
.cloned()
250+
.collect();
251+
252+
if equality_deletes.is_empty() {
253+
return Ok(None);
254+
}
255+
256+
let Some(combined_predicate) = self
257+
.build_combined_equality_delete_predicate(&equality_deletes)
258+
.await?
259+
else {
260+
return Ok(None);
261+
};
262+
243263
let bound_predicate = combined_predicate
244264
.bind(file_scan_task.schema.clone(), file_scan_task.case_sensitive)?;
245265
Ok(Some(bound_predicate))

crates/iceberg/src/arrow/incremental.rs

Lines changed: 171 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,18 @@ use arrow_schema::Schema as ArrowSchema;
2323
use futures::channel::mpsc::channel;
2424
use futures::stream::select;
2525
use futures::{Stream, StreamExt, TryStreamExt};
26-
use parquet::arrow::arrow_reader::ArrowReaderOptions;
2726

2827
use crate::arrow::reader::process_record_batch_stream;
29-
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
3028
use crate::arrow::{ArrowReader, StreamsInto};
3129
use crate::delete_vector::DeleteVector;
30+
use crate::expr::Bind;
3231
use crate::io::FileIO;
33-
use crate::metadata_columns::{RESERVED_FIELD_ID_POS, row_pos_field};
32+
use crate::metadata_columns::row_pos_field;
3433
use crate::runtime::spawn;
3534
use crate::scan::ArrowRecordBatchStream;
3635
use crate::scan::incremental::{
37-
AppendedFileScanTask, DeleteScanTask, IncrementalFileScanTaskStreams,
36+
AppendedFileScanTask, DeleteScanTask, EqualityDeleteScanTask, IncrementalFileScanTaskStreams,
3837
};
39-
use crate::spec::{Datum, PrimitiveType};
4038
use crate::{Error, ErrorKind, Result};
4139

4240
/// Default batch size for incremental delete operations.
@@ -64,89 +62,52 @@ async fn process_incremental_append_task(
6462
file_io: FileIO,
6563
metadata_size_hint: Option<usize>,
6664
) -> Result<ArrowRecordBatchStream> {
67-
let mut virtual_columns = Vec::new();
68-
69-
// Check if _pos column is requested and add it as a virtual column
70-
let has_pos_column = task.base.project_field_ids.contains(&RESERVED_FIELD_ID_POS);
71-
if has_pos_column {
72-
// Add _pos as a virtual column to be produced by the Parquet reader
73-
virtual_columns.push(Arc::clone(row_pos_field()));
74-
}
75-
76-
let arrow_reader_options = if !virtual_columns.is_empty() {
77-
Some(ArrowReaderOptions::new().with_virtual_columns(virtual_columns.clone())?)
78-
} else {
79-
None
80-
};
81-
82-
let mut record_batch_stream_builder = ArrowReader::create_parquet_record_batch_stream_builder(
83-
&task.base.data_file_path,
65+
let AppendedFileScanTask {
66+
base,
67+
positional_deletes,
68+
equality_delete_predicate,
69+
} = task;
70+
71+
let should_load_page_index =
72+
equality_delete_predicate.is_some() || positional_deletes.is_some();
73+
let equality_delete_bound = equality_delete_predicate
74+
.map(|p| p.bind(base.schema.clone(), base.case_sensitive))
75+
.transpose()?;
76+
77+
let (builder, has_missing_field_ids) = ArrowReader::open_parquet_stream_builder(
78+
&base.data_file_path,
79+
base.file_size_in_bytes,
8480
file_io,
85-
true,
86-
arrow_reader_options,
81+
should_load_page_index,
82+
ArrowReader::build_virtual_columns(&base.project_field_ids),
8783
metadata_size_hint,
88-
task.base.file_size_in_bytes,
84+
batch_size,
85+
None, // name_mapping not yet supported in incremental scan
8986
)
9087
.await?;
9188

92-
// Create a projection mask for the batch stream to select which columns in the
93-
// Parquet file that we want in the response
94-
let projection_mask = ArrowReader::get_arrow_projection_mask(
95-
&task.base.project_field_ids,
96-
&task.schema_ref(),
97-
record_batch_stream_builder.parquet_schema(),
98-
record_batch_stream_builder.schema(),
99-
false, // use_fallback
89+
let builder = ArrowReader::apply_parquet_filters(
90+
builder,
91+
base.start,
92+
base.length,
93+
&base.schema,
94+
equality_delete_bound.as_ref(),
95+
positional_deletes.as_deref(),
96+
true, // row_group_filtering_enabled
97+
true, // row_selection_enabled
98+
false, // use_predicate_projection: projection applied separately via build_projected_record_batch_stream
99+
has_missing_field_ids,
100100
)?;
101-
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);
102-
103-
// RecordBatchTransformer performs any transformations required on the RecordBatches
104-
// that come back from the file, such as type promotion, default column insertion,
105-
// column re-ordering, and virtual field addition (like _file)
106-
let datum = Datum::new(
107-
PrimitiveType::String,
108-
crate::spec::PrimitiveLiteral::String(task.base.data_file_path.clone()),
109-
);
110-
let mut record_batch_transformer_builder =
111-
RecordBatchTransformerBuilder::new(task.schema_ref(), &task.base.project_field_ids)
112-
.with_constant(crate::metadata_columns::RESERVED_FIELD_ID_FILE, datum);
113-
114-
if has_pos_column {
115-
record_batch_transformer_builder =
116-
record_batch_transformer_builder.with_virtual_field(Arc::clone(row_pos_field()))?;
117-
}
118-
119-
let mut record_batch_transformer = record_batch_transformer_builder.build();
120-
121-
if let Some(batch_size) = batch_size {
122-
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
123-
}
124-
125-
// Apply positional deletes as row selections.
126-
let row_selection = if let Some(positional_delete_indexes) = task.positional_deletes {
127-
Some(ArrowReader::build_deletes_row_selection(
128-
record_batch_stream_builder.metadata().row_groups(),
129-
&None,
130-
&positional_delete_indexes.lock().unwrap(),
131-
)?)
132-
} else {
133-
None
134-
};
135-
136-
if let Some(row_selection) = row_selection {
137-
record_batch_stream_builder = record_batch_stream_builder.with_row_selection(row_selection);
138-
}
139-
140-
// Build the batch stream and send all the RecordBatches that it generates
141-
// to the requester.
142-
let record_batch_stream = record_batch_stream_builder
143-
.build()?
144-
.map(move |batch| match batch {
145-
Ok(batch) => record_batch_transformer.process_record_batch(batch),
146-
Err(err) => Err(err.into()),
147-
});
148101

149-
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
102+
ArrowReader::build_projected_record_batch_stream(
103+
builder,
104+
&base.project_field_ids,
105+
base.schema,
106+
has_missing_field_ids,
107+
&base.data_file_path,
108+
base.partition_spec,
109+
base.partition,
110+
)
150111
}
151112

152113
/// Helper function to create a RecordBatch from a chunk of position values.
@@ -183,7 +144,7 @@ fn process_incremental_delete_task(
183144
) -> Result<ArrowRecordBatchStream> {
184145
let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
185146

186-
// Create schema with _file column first, then pos (Int64)
147+
// Create schema with file_path column first, then pos (Int64)
187148
let schema = Arc::new(ArrowSchema::new(vec![
188149
Arc::clone(crate::metadata_columns::file_path_field()),
189150
Arc::clone(crate::metadata_columns::pos_field_arrow()),
@@ -205,7 +166,7 @@ fn process_incremental_deleted_file_task(
205166
) -> Result<ArrowRecordBatchStream> {
206167
let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
207168

208-
// Create schema with _file column first, then pos (Int64)
169+
// Create schema with file_path column first, then pos (Int64)
209170
let schema = Arc::new(ArrowSchema::new(vec![
210171
Arc::clone(crate::metadata_columns::file_path_field()),
211172
Arc::clone(crate::metadata_columns::pos_field_arrow()),
@@ -219,6 +180,111 @@ fn process_incremental_deleted_file_task(
219180
Ok(Box::pin(stream) as ArrowRecordBatchStream)
220181
}
221182

183+
/// Process equality delete task by reading the data file with equality delete predicates applied
184+
/// as a row filter, and emitting record batches containing matching row positions.
185+
async fn process_equality_delete_task(
186+
task: EqualityDeleteScanTask,
187+
batch_size: Option<usize>,
188+
file_io: FileIO,
189+
metadata_size_hint: Option<usize>,
190+
) -> Result<ArrowRecordBatchStream> {
191+
let file_path = task.data_file_path().to_string();
192+
193+
// Create output schema with file_path column first, then pos (Int64)
194+
let output_schema = Arc::new(ArrowSchema::new(vec![
195+
Arc::clone(crate::metadata_columns::file_path_field()),
196+
Arc::clone(crate::metadata_columns::pos_field_arrow()),
197+
]));
198+
199+
let bound_predicate = task
200+
.combined_predicate
201+
.bind(task.schema_ref(), task.base.case_sensitive)?;
202+
203+
let (builder, has_missing_field_ids) = ArrowReader::open_parquet_stream_builder(
204+
&task.base.data_file_path,
205+
task.base.file_size_in_bytes,
206+
file_io,
207+
true, // always load page index: we always have a predicate
208+
vec![Arc::clone(row_pos_field())],
209+
metadata_size_hint,
210+
batch_size,
211+
None, // name_mapping not yet supported in incremental scan
212+
)
213+
.await?;
214+
215+
let builder = ArrowReader::apply_parquet_filters(
216+
builder,
217+
task.base.start,
218+
task.base.length,
219+
&task.base.schema,
220+
Some(&bound_predicate),
221+
None, // no positional deletes for equality delete tasks
222+
true, // row_group_filtering_enabled
223+
false, // row_selection_enabled
224+
true, // use_predicate_projection: project to predicate columns only
225+
has_missing_field_ids,
226+
)?;
227+
228+
// Build the stream of filtered records
229+
let record_batch_stream = builder.build()?;
230+
231+
// Extract positions from the _pos column and emit delete batches
232+
let output_schema_clone = output_schema.clone();
233+
let file_path_clone = file_path.clone();
234+
235+
let stream = record_batch_stream
236+
.then(move |batch_result| {
237+
let schema = output_schema_clone.clone();
238+
let path = file_path_clone.clone();
239+
async move {
240+
match batch_result {
241+
Ok(batch) => {
242+
// Extract _pos column (last column due to virtual_columns).
243+
// _pos is always non-null: it is a virtual column representing the
244+
// physical row position, produced by the Parquet reader for every row.
245+
let pos_col = batch
246+
.column(batch.num_columns() - 1)
247+
.as_any()
248+
.downcast_ref::<Int64Array>()
249+
.ok_or_else(|| {
250+
Error::new(
251+
ErrorKind::Unexpected,
252+
"Failed to extract _pos column from equality delete batch",
253+
)
254+
})?;
255+
256+
let num_rows = pos_col.len();
257+
if num_rows == 0 {
258+
return Ok(RecordBatch::new_empty(Arc::clone(&schema)));
259+
}
260+
261+
// Reuse the Int64Array directly as the pos column.
262+
// Build a matching file_path StringArray.
263+
let file_array = Arc::new(arrow_array::StringArray::from(vec![
264+
path.as_str(
265+
);
266+
num_rows
267+
]));
268+
RecordBatch::try_new(Arc::clone(&schema), vec![
269+
file_array,
270+
Arc::clone(batch.column(batch.num_columns() - 1)),
271+
])
272+
.map_err(|e| {
273+
Error::new(
274+
ErrorKind::Unexpected,
275+
format!("Failed to create equality delete batch: {e}"),
276+
)
277+
})
278+
}
279+
Err(e) => Err(e.into()),
280+
}
281+
}
282+
})
283+
.boxed();
284+
285+
Ok(Box::pin(stream) as ArrowRecordBatchStream)
286+
}
287+
222288
impl StreamsInto<ArrowReader, CombinedIncrementalBatchRecordStream>
223289
for IncrementalFileScanTaskStreams
224290
{
@@ -252,11 +318,11 @@ impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
252318
let (append_stream, delete_stream) = self;
253319

254320
// Process append tasks
255-
let file_io = reader.file_io.clone();
321+
let file_io_append = reader.file_io.clone();
256322
spawn(async move {
257323
let _ = append_stream
258324
.try_for_each_concurrent(reader.concurrency_limit_data_files, |append_task| {
259-
let file_io = file_io.clone();
325+
let file_io = file_io_append.clone();
260326
let appends_tx = appends_tx.clone();
261327
async move {
262328
spawn(async move {
@@ -282,10 +348,12 @@ impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
282348
});
283349

284350
// Process delete tasks
351+
let file_io_delete = reader.file_io.clone();
285352
spawn(async move {
286353
let _ = delete_stream
287354
.try_for_each_concurrent(reader.concurrency_limit_data_files, |delete_task| {
288355
let deletes_tx = deletes_tx.clone();
356+
let file_io = file_io_delete.clone();
289357
async move {
290358
match delete_task {
291359
DeleteScanTask::DeletedFile(deleted_file_task) => {
@@ -324,6 +392,24 @@ impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
324392
.await;
325393
});
326394
}
395+
DeleteScanTask::EqualityDeletes(equality_delete_task) => {
396+
spawn(async move {
397+
let record_batch_stream = process_equality_delete_task(
398+
equality_delete_task,
399+
batch_size,
400+
file_io.clone(),
401+
metadata_size_hint,
402+
)
403+
.await;
404+
405+
process_record_batch_stream(
406+
record_batch_stream,
407+
deletes_tx,
408+
"failed to read equality delete record batch",
409+
)
410+
.await;
411+
});
412+
}
327413
}
328414
Ok(())
329415
}

0 commit comments

Comments
 (0)