@@ -29,12 +29,13 @@ use crate::arrow::reader::process_record_batch_stream;
2929use crate :: arrow:: record_batch_transformer:: RecordBatchTransformerBuilder ;
3030use crate :: arrow:: { ArrowReader , StreamsInto } ;
3131use crate :: delete_vector:: DeleteVector ;
32+ use crate :: expr:: Bind ;
3233use crate :: io:: FileIO ;
3334use crate :: metadata_columns:: { RESERVED_FIELD_ID_POS , row_pos_field} ;
3435use crate :: runtime:: spawn;
3536use crate :: scan:: ArrowRecordBatchStream ;
3637use crate :: scan:: incremental:: {
37- AppendedFileScanTask , DeleteScanTask , IncrementalFileScanTaskStreams ,
38+ AppendedFileScanTask , DeleteScanTask , EqualityDeleteScanTask , IncrementalFileScanTaskStreams ,
3839} ;
3940use crate :: spec:: { Datum , PrimitiveType } ;
4041use crate :: { Error , ErrorKind , Result } ;
@@ -123,7 +124,7 @@ async fn process_incremental_append_task(
123124 }
124125
125126 // Apply positional deletes as row selections.
126- let row_selection = if let Some ( positional_delete_indexes) = task. positional_deletes {
127+ let row_selection = if let Some ( ref positional_delete_indexes) = task. positional_deletes {
127128 Some ( ArrowReader :: build_deletes_row_selection (
128129 record_batch_stream_builder. metadata ( ) . row_groups ( ) ,
129130 & None ,
@@ -137,6 +138,34 @@ async fn process_incremental_append_task(
137138 record_batch_stream_builder = record_batch_stream_builder. with_row_selection ( row_selection) ;
138139 }
139140
141+ // Apply equality deletes as a row filter predicate.
142+ if !task. equality_deletes . is_empty ( ) {
143+ // Build the combined equality delete predicate
144+ let combined_predicate = task
145+ . delete_filter
146+ . build_combined_equality_delete_predicate ( & task. equality_deletes )
147+ . await ?;
148+
149+ // Bind the predicate to the schema
150+ let bound_predicate = combined_predicate. bind (
151+ task. schema_ref ( ) ,
152+ false , // case_sensitive - matches the behavior in reader.rs
153+ ) ?;
154+
155+ let ( iceberg_field_ids, field_id_map) = ArrowReader :: build_field_id_set_and_map (
156+ record_batch_stream_builder. parquet_schema ( ) ,
157+ & bound_predicate,
158+ ) ?;
159+
160+ let row_filter = ArrowReader :: get_row_filter (
161+ & bound_predicate,
162+ record_batch_stream_builder. parquet_schema ( ) ,
163+ & iceberg_field_ids,
164+ & field_id_map,
165+ ) ?;
166+ record_batch_stream_builder = record_batch_stream_builder. with_row_filter ( row_filter) ;
167+ }
168+
140169 // Build the batch stream and send all the RecordBatches that it generates
141170 // to the requester.
142171 let record_batch_stream = record_batch_stream_builder
@@ -219,6 +248,108 @@ fn process_incremental_deleted_file_task(
219248 Ok ( Box :: pin ( stream) as ArrowRecordBatchStream )
220249}
221250
251+ /// Process equality delete task by reading the data file with equality delete predicates applied
252+ /// as a row filter, and emitting record batches containing matching row positions.
253+ async fn process_equality_delete_task (
254+ task : EqualityDeleteScanTask ,
255+ batch_size : Option < usize > ,
256+ file_io : crate :: io:: FileIO ,
257+ ) -> Result < ArrowRecordBatchStream > {
258+ let batch_size = batch_size. unwrap_or ( DEFAULT_BATCH_SIZE ) ;
259+ let file_path = task. data_file_path ( ) . to_string ( ) ;
260+
261+ // Create output schema with _file column first, then pos (Int64)
262+ let output_schema = Arc :: new ( ArrowSchema :: new ( vec ! [
263+ Arc :: clone( crate :: metadata_columns:: file_path_field( ) ) ,
264+ Arc :: clone( crate :: metadata_columns:: pos_field_arrow( ) ) ,
265+ ] ) ) ;
266+
267+ // Add _pos virtual column to track row positions
268+ let virtual_columns = vec ! [ Arc :: clone( row_pos_field( ) ) ] ;
269+ let arrow_reader_options = ArrowReaderOptions :: new ( ) . with_virtual_columns ( virtual_columns) ?;
270+
271+ // Create parquet reader with virtual columns to get schema and apply equality deletes
272+ let mut record_batch_stream_builder = ArrowReader :: create_parquet_record_batch_stream_builder (
273+ & file_path,
274+ file_io,
275+ true ,
276+ Some ( arrow_reader_options) ,
277+ None ,
278+ task. base . file_size_in_bytes ,
279+ )
280+ . await ?;
281+
282+ // The combined_predicate is already negated (selects rows TO DELETE).
283+ let schema = task. schema_ref ( ) ;
284+ let bound_predicate = task. combined_predicate . bind ( schema, false ) ?;
285+
286+ // Get field ID mappings for the predicate
287+ let ( iceberg_field_ids, field_id_map) = ArrowReader :: build_field_id_set_and_map (
288+ record_batch_stream_builder. parquet_schema ( ) ,
289+ & bound_predicate,
290+ ) ?;
291+
292+ // Create row filter from the bound predicate
293+ let row_filter = ArrowReader :: get_row_filter (
294+ & bound_predicate,
295+ record_batch_stream_builder. parquet_schema ( ) ,
296+ & iceberg_field_ids,
297+ & field_id_map,
298+ ) ?;
299+
300+ // Apply the row filter to get only rows matching equality delete predicates
301+ record_batch_stream_builder = record_batch_stream_builder. with_row_filter ( row_filter) ;
302+
303+ record_batch_stream_builder = record_batch_stream_builder. with_batch_size ( batch_size) ;
304+
305+ // Build the stream of filtered records
306+ let record_batch_stream = record_batch_stream_builder. build ( ) ?;
307+
308+ // Extract positions from the _pos column and emit delete batches
309+ let output_schema_clone = output_schema. clone ( ) ;
310+ let file_path_clone = file_path. clone ( ) ;
311+
312+ let stream = record_batch_stream
313+ . then ( move |batch_result| {
314+ let schema = output_schema_clone. clone ( ) ;
315+ let path = file_path_clone. clone ( ) ;
316+ async move {
317+ match batch_result {
318+ Ok ( batch) => {
319+ // Extract _pos column (should be the last column due to virtual_columns)
320+ if let Some ( pos_column) = batch
321+ . column ( batch. num_columns ( ) - 1 )
322+ . as_any ( )
323+ . downcast_ref :: < Int64Array > ( )
324+ {
325+ // Collect positions from the _pos column
326+ let positions: Vec < u64 > = pos_column
327+ . iter ( )
328+ . filter_map ( |v| v. map ( |p| p as u64 ) )
329+ . collect ( ) ;
330+
331+ // Create delete batches with the matching positions
332+ if !positions. is_empty ( ) {
333+ create_delete_batch ( & schema, & path, positions)
334+ } else {
335+ Ok ( RecordBatch :: new_empty ( Arc :: clone ( & schema) ) )
336+ }
337+ } else {
338+ Err ( Error :: new (
339+ ErrorKind :: Unexpected ,
340+ "Failed to extract _pos column from equality delete batch" ,
341+ ) )
342+ }
343+ }
344+ Err ( e) => Err ( e. into ( ) ) ,
345+ }
346+ }
347+ } )
348+ . boxed ( ) ;
349+
350+ Ok ( Box :: pin ( stream) as ArrowRecordBatchStream )
351+ }
352+
222353impl StreamsInto < ArrowReader , CombinedIncrementalBatchRecordStream >
223354 for IncrementalFileScanTaskStreams
224355{
@@ -252,11 +383,11 @@ impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
252383 let ( append_stream, delete_stream) = self ;
253384
254385 // Process append tasks
255- let file_io = reader. file_io . clone ( ) ;
386+ let file_io_append = reader. file_io . clone ( ) ;
256387 spawn ( async move {
257388 let _ = append_stream
258389 . try_for_each_concurrent ( reader. concurrency_limit_data_files , |append_task| {
259- let file_io = file_io . clone ( ) ;
390+ let file_io = file_io_append . clone ( ) ;
260391 let appends_tx = appends_tx. clone ( ) ;
261392 async move {
262393 spawn ( async move {
@@ -282,10 +413,12 @@ impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
282413 } ) ;
283414
284415 // Process delete tasks
416+ let file_io_delete = reader. file_io . clone ( ) ;
285417 spawn ( async move {
286418 let _ = delete_stream
287419 . try_for_each_concurrent ( reader. concurrency_limit_data_files , |delete_task| {
288420 let deletes_tx = deletes_tx. clone ( ) ;
421+ let file_io = file_io_delete. clone ( ) ;
289422 async move {
290423 match delete_task {
291424 DeleteScanTask :: DeletedFile ( deleted_file_task) => {
@@ -324,6 +457,23 @@ impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
324457 . await ;
325458 } ) ;
326459 }
460+ DeleteScanTask :: EqualityDeletes ( equality_delete_task) => {
461+ spawn ( async move {
462+ let record_batch_stream = process_equality_delete_task (
463+ equality_delete_task,
464+ batch_size,
465+ file_io. clone ( ) ,
466+ )
467+ . await ;
468+
469+ process_record_batch_stream (
470+ record_batch_stream,
471+ deletes_tx,
472+ "failed to read equality delete record batch" ,
473+ )
474+ . await ;
475+ } ) ;
476+ }
327477 }
328478 Ok ( ( ) )
329479 }
0 commit comments