Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
0e4e526
[incremental scan] Support for equality deletes
gbrgr Mar 5, 2026
c910e94
Clippy fix
gbrgr Mar 5, 2026
c1fa1f8
Refactor
gbrgr Mar 5, 2026
aab0b15
.
gbrgr Mar 5, 2026
34d9159
Small optimization
gbrgr Mar 5, 2026
f972615
Clippy fix
gbrgr Mar 5, 2026
947f055
Fix deadlock
gbrgr Mar 5, 2026
a98e1ce
Add test
gbrgr Mar 5, 2026
f6c871c
Code reuse
gbrgr Mar 6, 2026
3e62a7d
Add test
gbrgr Mar 6, 2026
48e3b10
Cleanup
gbrgr Mar 6, 2026
585e0bc
Some PR comments
gbrgr Mar 6, 2026
54ea7d6
Enable row group filtering
gbrgr Mar 6, 2026
eef647d
PR comments
gbrgr Mar 6, 2026
a0f211d
Factor out common code
gbrgr Mar 6, 2026
4907990
Clippy fix
gbrgr Mar 6, 2026
e58b4bf
Tighten method visibility
gbrgr Mar 6, 2026
897abf6
.
gbrgr Mar 6, 2026
80a20f7
Unify some more
gbrgr Mar 6, 2026
5d1b3ca
Format
gbrgr Mar 6, 2026
4f51fa7
.
gbrgr Mar 6, 2026
f64d82e
Unify more
gbrgr Mar 6, 2026
d5453ce
Unify more, add partition info to append task
gbrgr Mar 6, 2026
19826d1
.
gbrgr Mar 6, 2026
2eaba7f
Unify predicate application
gbrgr Mar 8, 2026
379a27d
.
gbrgr Mar 8, 2026
09de39d
Allow in tests for multiple eq columns
gbrgr Mar 8, 2026
7cad600
Add test
gbrgr Mar 9, 2026
411147d
Remove rewrite
gbrgr Mar 9, 2026
936efa4
Add case_sensitive
gbrgr Mar 9, 2026
d79ad18
.
gbrgr Mar 9, 2026
940be35
Add helper
gbrgr Mar 9, 2026
4122d6e
Format
gbrgr Mar 9, 2026
33a0288
Fix clippy
gbrgr Mar 9, 2026
a779e05
Some PR comments
gbrgr Mar 9, 2026
08fe33f
Make big helper
gbrgr Mar 9, 2026
f7a142a
Split up more
gbrgr Mar 9, 2026
ee5fc70
Inline small helpers
gbrgr Mar 9, 2026
fab3efa
Reinsert comment
gbrgr Mar 9, 2026
7f11c4a
Narrow visibility
gbrgr Mar 9, 2026
ca2d37e
Remove mut
gbrgr Mar 9, 2026
bef812d
.
gbrgr Mar 9, 2026
b59b570
Format
gbrgr Mar 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 33 additions & 13 deletions crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,22 +204,15 @@ impl DeleteFilter {
}
}

/// Builds eq delete predicate for the provided task.
pub(crate) async fn build_equality_delete_predicate(
/// Builds combined predicate from a list of equality delete files.
/// Retrieves predicates for each file, combines them with AND logic.
pub(crate) async fn build_combined_equality_delete_predicate(
&self,
file_scan_task: &FileScanTask,
) -> Result<Option<BoundPredicate>> {
// * Filter the task's deletes into just the Equality deletes
// * Retrieve the unbound predicate for each from self.state.equality_deletes
// * Logical-AND them all together to get a single combined `Predicate`
// * Bind the predicate to the task's schema to get a `BoundPredicate`

equality_delete_files: &[FileScanTaskDeleteFile],
) -> Result<Option<Predicate>> {
let mut combined_predicate = AlwaysTrue;
for delete in &file_scan_task.deletes {
if !is_equality_delete(delete) {
Comment thread
vustef marked this conversation as resolved.
continue;
}

for delete in equality_delete_files {
let Some(predicate) = self
.get_equality_delete_predicate_for_delete_file_path(&delete.file_path)
.await
Expand All @@ -240,6 +233,33 @@ impl DeleteFilter {
return Ok(None);
}

Ok(Some(combined_predicate))
}

/// Builds eq delete predicate for the provided task.
pub(crate) async fn build_equality_delete_predicate(
&self,
file_scan_task: &FileScanTask,
) -> Result<Option<BoundPredicate>> {
// Filter the task's deletes into just the Equality deletes
let equality_deletes: Vec<FileScanTaskDeleteFile> = file_scan_task
.deletes
.iter()
.filter(|delete| is_equality_delete(delete))
.cloned()
.collect();

if equality_deletes.is_empty() {
return Ok(None);
}

let Some(combined_predicate) = self
.build_combined_equality_delete_predicate(&equality_deletes)
.await?
else {
return Ok(None);
};

let bound_predicate = combined_predicate
.bind(file_scan_task.schema.clone(), file_scan_task.case_sensitive)?;
Ok(Some(bound_predicate))
Expand Down
256 changes: 171 additions & 85 deletions crates/iceberg/src/arrow/incremental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,18 @@ use arrow_schema::Schema as ArrowSchema;
use futures::channel::mpsc::channel;
use futures::stream::select;
use futures::{Stream, StreamExt, TryStreamExt};
use parquet::arrow::arrow_reader::ArrowReaderOptions;

use crate::arrow::reader::process_record_batch_stream;
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
use crate::arrow::{ArrowReader, StreamsInto};
use crate::delete_vector::DeleteVector;
use crate::expr::Bind;
use crate::io::FileIO;
use crate::metadata_columns::{RESERVED_FIELD_ID_POS, row_pos_field};
use crate::metadata_columns::row_pos_field;
use crate::runtime::spawn;
use crate::scan::ArrowRecordBatchStream;
use crate::scan::incremental::{
AppendedFileScanTask, DeleteScanTask, IncrementalFileScanTaskStreams,
AppendedFileScanTask, DeleteScanTask, EqualityDeleteScanTask, IncrementalFileScanTaskStreams,
};
use crate::spec::{Datum, PrimitiveType};
use crate::{Error, ErrorKind, Result};

/// Default batch size for incremental delete operations.
Expand Down Expand Up @@ -64,89 +62,52 @@ async fn process_incremental_append_task(
file_io: FileIO,
metadata_size_hint: Option<usize>,
) -> Result<ArrowRecordBatchStream> {
let mut virtual_columns = Vec::new();

// Check if _pos column is requested and add it as a virtual column
let has_pos_column = task.base.project_field_ids.contains(&RESERVED_FIELD_ID_POS);
if has_pos_column {
// Add _pos as a virtual column to be produced by the Parquet reader
virtual_columns.push(Arc::clone(row_pos_field()));
}

let arrow_reader_options = if !virtual_columns.is_empty() {
Some(ArrowReaderOptions::new().with_virtual_columns(virtual_columns.clone())?)
} else {
None
};

let mut record_batch_stream_builder = ArrowReader::create_parquet_record_batch_stream_builder(
&task.base.data_file_path,
let AppendedFileScanTask {
base,
positional_deletes,
equality_delete_predicate,
} = task;

let should_load_page_index =
equality_delete_predicate.is_some() || positional_deletes.is_some();
let equality_delete_bound = equality_delete_predicate
.map(|p| p.bind(base.schema.clone(), base.case_sensitive))
.transpose()?;

let (builder, has_missing_field_ids) = ArrowReader::open_parquet_stream_builder(
&base.data_file_path,
base.file_size_in_bytes,
file_io,
true,
arrow_reader_options,
should_load_page_index,
ArrowReader::build_virtual_columns(&base.project_field_ids),
metadata_size_hint,
task.base.file_size_in_bytes,
batch_size,
None, // name_mapping not yet supported in incremental scan
)
.await?;

// Create a projection mask for the batch stream to select which columns in the
// Parquet file that we want in the response
let projection_mask = ArrowReader::get_arrow_projection_mask(
&task.base.project_field_ids,
&task.schema_ref(),
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
false, // use_fallback
let builder = ArrowReader::apply_parquet_filters(
builder,
base.start,
base.length,
&base.schema,
equality_delete_bound.as_ref(),
positional_deletes.as_deref(),
true, // row_group_filtering_enabled
true, // row_selection_enabled
false, // use_predicate_projection: projection applied separately via build_projected_record_batch_stream
has_missing_field_ids,
)?;
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);

// RecordBatchTransformer performs any transformations required on the RecordBatches
// that come back from the file, such as type promotion, default column insertion,
// column re-ordering, and virtual field addition (like _file)
let datum = Datum::new(
PrimitiveType::String,
crate::spec::PrimitiveLiteral::String(task.base.data_file_path.clone()),
);
let mut record_batch_transformer_builder =
RecordBatchTransformerBuilder::new(task.schema_ref(), &task.base.project_field_ids)
.with_constant(crate::metadata_columns::RESERVED_FIELD_ID_FILE, datum);

if has_pos_column {
record_batch_transformer_builder =
record_batch_transformer_builder.with_virtual_field(Arc::clone(row_pos_field()))?;
}

let mut record_batch_transformer = record_batch_transformer_builder.build();

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
}

// Apply positional deletes as row selections.
let row_selection = if let Some(positional_delete_indexes) = task.positional_deletes {
Some(ArrowReader::build_deletes_row_selection(
record_batch_stream_builder.metadata().row_groups(),
&None,
&positional_delete_indexes.lock().unwrap(),
)?)
} else {
None
};

if let Some(row_selection) = row_selection {
record_batch_stream_builder = record_batch_stream_builder.with_row_selection(row_selection);
}

// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let record_batch_stream = record_batch_stream_builder
.build()?
.map(move |batch| match batch {
Ok(batch) => record_batch_transformer.process_record_batch(batch),
Err(err) => Err(err.into()),
});

Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
ArrowReader::build_projected_record_batch_stream(
builder,
&base.project_field_ids,
base.schema,
has_missing_field_ids,
&base.data_file_path,
base.partition_spec,
base.partition,
)
}

/// Helper function to create a RecordBatch from a chunk of position values.
Expand Down Expand Up @@ -183,7 +144,7 @@ fn process_incremental_delete_task(
) -> Result<ArrowRecordBatchStream> {
let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);

// Create schema with _file column first, then pos (Int64)
// Create schema with file_path column first, then pos (Int64)
let schema = Arc::new(ArrowSchema::new(vec![
Arc::clone(crate::metadata_columns::file_path_field()),
Arc::clone(crate::metadata_columns::pos_field_arrow()),
Expand All @@ -205,7 +166,7 @@ fn process_incremental_deleted_file_task(
) -> Result<ArrowRecordBatchStream> {
let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);

// Create schema with _file column first, then pos (Int64)
// Create schema with file_path column first, then pos (Int64)
let schema = Arc::new(ArrowSchema::new(vec![
Arc::clone(crate::metadata_columns::file_path_field()),
Arc::clone(crate::metadata_columns::pos_field_arrow()),
Expand All @@ -219,6 +180,111 @@ fn process_incremental_deleted_file_task(
Ok(Box::pin(stream) as ArrowRecordBatchStream)
}

/// Process equality delete task by reading the data file with equality delete predicates applied
/// as a row filter, and emitting record batches containing matching row positions.
async fn process_equality_delete_task(
task: EqualityDeleteScanTask,
batch_size: Option<usize>,
file_io: FileIO,
metadata_size_hint: Option<usize>,
) -> Result<ArrowRecordBatchStream> {
let file_path = task.data_file_path().to_string();

// Create output schema with file_path column first, then pos (Int64)
let output_schema = Arc::new(ArrowSchema::new(vec![
Arc::clone(crate::metadata_columns::file_path_field()),
Arc::clone(crate::metadata_columns::pos_field_arrow()),
]));

let bound_predicate = task
.combined_predicate
.bind(task.schema_ref(), task.base.case_sensitive)?;

let (builder, has_missing_field_ids) = ArrowReader::open_parquet_stream_builder(
&task.base.data_file_path,
task.base.file_size_in_bytes,
file_io,
true, // always load page index: we always have a predicate
vec![Arc::clone(row_pos_field())],
metadata_size_hint,
batch_size,
None, // name_mapping not yet supported in incremental scan
)
.await?;

let builder = ArrowReader::apply_parquet_filters(
builder,
task.base.start,
task.base.length,
&task.base.schema,
Some(&bound_predicate),
None, // no positional deletes for equality delete tasks
true, // row_group_filtering_enabled
false, // row_selection_enabled
true, // use_predicate_projection: project to predicate columns only
has_missing_field_ids,
)?;

// Build the stream of filtered records
let record_batch_stream = builder.build()?;

// Extract positions from the _pos column and emit delete batches
let output_schema_clone = output_schema.clone();
let file_path_clone = file_path.clone();

let stream = record_batch_stream
.then(move |batch_result| {
let schema = output_schema_clone.clone();
let path = file_path_clone.clone();
async move {
match batch_result {
Ok(batch) => {
// Extract _pos column (last column due to virtual_columns).
// _pos is always non-null: it is a virtual column representing the
Comment thread
vustef marked this conversation as resolved.
// physical row position, produced by the Parquet reader for every row.
let pos_col = batch
.column(batch.num_columns() - 1)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"Failed to extract _pos column from equality delete batch",
)
})?;

let num_rows = pos_col.len();
if num_rows == 0 {
return Ok(RecordBatch::new_empty(Arc::clone(&schema)));
}

// Reuse the Int64Array directly as the pos column.
// Build a matching file_path StringArray.
let file_array = Arc::new(arrow_array::StringArray::from(vec![
path.as_str(
);
num_rows
]));
RecordBatch::try_new(Arc::clone(&schema), vec![
file_array,
Arc::clone(batch.column(batch.num_columns() - 1)),
])
.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to create equality delete batch: {e}"),
)
})
}
Err(e) => Err(e.into()),
}
}
})
.boxed();

Ok(Box::pin(stream) as ArrowRecordBatchStream)
}

impl StreamsInto<ArrowReader, CombinedIncrementalBatchRecordStream>
for IncrementalFileScanTaskStreams
{
Expand Down Expand Up @@ -252,11 +318,11 @@ impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
let (append_stream, delete_stream) = self;

// Process append tasks
let file_io = reader.file_io.clone();
let file_io_append = reader.file_io.clone();
spawn(async move {
let _ = append_stream
.try_for_each_concurrent(reader.concurrency_limit_data_files, |append_task| {
let file_io = file_io.clone();
let file_io = file_io_append.clone();
let appends_tx = appends_tx.clone();
async move {
spawn(async move {
Expand All @@ -282,10 +348,12 @@ impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
});

// Process delete tasks
let file_io_delete = reader.file_io.clone();
spawn(async move {
let _ = delete_stream
.try_for_each_concurrent(reader.concurrency_limit_data_files, |delete_task| {
let deletes_tx = deletes_tx.clone();
let file_io = file_io_delete.clone();
async move {
match delete_task {
DeleteScanTask::DeletedFile(deleted_file_task) => {
Expand Down Expand Up @@ -324,6 +392,24 @@ impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
.await;
});
}
DeleteScanTask::EqualityDeletes(equality_delete_task) => {
spawn(async move {
let record_batch_stream = process_equality_delete_task(
equality_delete_task,
batch_size,
file_io.clone(),
metadata_size_hint,
)
.await;

process_record_batch_stream(
record_batch_stream,
deletes_tx,
"failed to read equality delete record batch",
)
.await;
});
}
}
Ok(())
}
Expand Down
Loading
Loading