Skip to content

[incremental scan] Support for equality deletes#60

Merged
gbrgr merged 43 commits intomainfrom
gb/eq-deletes
Mar 9, 2026
Merged

[incremental scan] Support for equality deletes#60
gbrgr merged 43 commits intomainfrom
gb/eq-deletes

Conversation

@gbrgr
Copy link
Copy Markdown
Collaborator

@gbrgr gbrgr commented Mar 5, 2026

@gbrgr gbrgr marked this pull request as ready for review March 5, 2026 15:22
@gbrgr gbrgr requested a review from vustef March 5, 2026 15:23
/// Spawns a concurrent task to collect all manifest entries from the `from_snapshot`.
/// Returns a receiver that will yield the manifest entries as they are collected.
/// Errors are sent through `error_tx`.
fn spawn_baseline_file_collection(
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vustef this is code to read metadata about the from_snapshot. I could not really reuse code from the full scan here, because the manifest reading logic is tightly coupled into plan_files. Should not hurt here though.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see though what is different in this function than any other that reads metadata about a snapshot?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could from_snapshot be a parameter, so that the function is not special and tied to the "baseline_file_collection"?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As said, the way metadata is read in the full scan is tightly coupled to the specific manifest file context types, etc.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add a parameter

if !equality_deletes.is_empty() {
// The predicate from build_combined_equality_delete_predicate is a "survival"
// filter (keeps non-deleted rows). Negate it to select rows TO DELETE.
let survival_predicate = delete_filter
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice that for the full scan, the equality predicate is built at read time in the arrow reader. However, since the incremental scan builds the delete index beforehand, we can compile it here at plan time.

Copy link
Copy Markdown
Collaborator

@vustef vustef left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First round + will review tests later. Thanks Gerald

Comment thread crates/iceberg/src/arrow/delete_filter.rs
Comment thread crates/iceberg/src/arrow/delete_filter.rs
Comment thread crates/iceberg/src/arrow/incremental.rs Outdated
Comment thread crates/iceberg/src/arrow/incremental.rs Outdated
Comment thread crates/iceberg/src/arrow/incremental.rs Outdated
Comment thread crates/iceberg/src/scan/incremental/mod.rs Outdated
/// Spawns a concurrent task to collect all manifest entries from the `from_snapshot`.
/// Returns a receiver that will yield the manifest entries as they are collected.
/// Errors are sent through `error_tx`.
fn spawn_baseline_file_collection(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see though what is different in this function than any other that reads metadata about a snapshot?

/// Spawns a concurrent task to collect all manifest entries from the `from_snapshot`.
/// Returns a receiver that will yield the manifest entries as they are collected.
/// Errors are sent through `error_tx`.
fn spawn_baseline_file_collection(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could from_snapshot be a parameter, so that the function is not special and tied to the "baseline_file_collection"?

Comment thread crates/iceberg/src/scan/incremental/mod.rs Outdated
Comment thread crates/iceberg/src/scan/incremental/task.rs
Copy link
Copy Markdown
Collaborator

@vustef vustef left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went through everything now. My main concern is code duplication between all these different tasks (full, and all incremental ones). Let's discuss after you go through comments

Comment thread crates/iceberg/src/arrow/incremental.rs Outdated
Comment thread crates/iceberg/src/arrow/incremental.rs Outdated
Comment thread crates/iceberg/src/arrow/incremental.rs
Comment thread crates/iceberg/src/arrow/incremental.rs Outdated
Comment thread crates/iceberg/src/arrow/delete_filter.rs
Comment thread crates/iceberg/src/scan/incremental/tests.rs Outdated
Comment thread crates/iceberg/src/scan/incremental/tests.rs Outdated
Comment thread crates/iceberg/src/scan/incremental/tests.rs
Comment thread crates/iceberg/src/scan/incremental/tests.rs
Comment thread crates/iceberg/src/scan/incremental/tests.rs Outdated
Comment thread crates/iceberg/src/arrow/incremental.rs Outdated
Comment thread crates/iceberg/src/arrow/incremental.rs Outdated
Comment thread crates/iceberg/src/arrow/incremental.rs Outdated
Comment thread crates/iceberg/src/arrow/reader.rs Outdated
Comment on lines +299 to +302
// Three-branch strategy matching Java's ReadConf constructor:
// Branch 1: file has embedded field IDs → use as-is
// Branch 2: name_mapping present → apply name mapping, reopen
// Branch 3: fallback → assign position-based IDs, reopen
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is needed, it's stated in the function. Otherwise I expected to see this logic here

Comment thread crates/iceberg/src/arrow/reader.rs Outdated
) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
// Metadata fields (e.g. _file, _pos) are virtual — they don't exist as Parquet columns.
// Filter them out so get_arrow_projection_mask only sees real schema field IDs.
let real_field_ids: Vec<i32> = field_ids
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: perhaps we should keep the names, in this case let project_field_ids_without_metadata, so that it's easier to correlate that with the previous code / upstream

Comment thread crates/iceberg/src/arrow/reader.rs Outdated
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is the difference, but I'm think we can control this difference through a param, that might allow us to skip this step. We either get delete_predicate as a param in a common function, or we get a receiver.

Comment thread crates/iceberg/src/arrow/reader.rs
Comment thread crates/iceberg/src/arrow/reader.rs Outdated
let (iceberg_field_ids, field_id_map) =
Self::build_field_id_set_and_map(builder.parquet_schema(), bound_predicate)?;

if let Some(use_fallback) = projection {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this, so incremental always uses fallback? Why? And why do we call this fallback?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should incremental always use this fallback? That's parameterized by has_missing_field_ids. From Claude

 So "fallback" is not about incremental code at all — it refers to position-based projection for migrated tables that lack embedded Parquet field IDs, versus the normal field-ID-based
  projection. It corresponds directly to has_missing_field_ids.

  Breaking it down:
  - projection = None → skip projection entirely (append task: projection is handled separately afterward via apply_projection)
  - projection = Some(false) → apply field-ID-based projection (file has embedded field IDs, normal case)
  - projection = Some(true) → apply position-based projection (file lacks field IDs, i.e. has_missing_field_ids = true, migrated table)```

Comment thread crates/iceberg/src/arrow/reader.rs Outdated
Comment thread crates/iceberg/src/arrow/reader.rs Outdated
}
};

// There are three possible sources for potential lists of selected RowGroup indices,
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO we lost this comment, will reinsert

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Copy Markdown
Collaborator

@vustef vustef left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Gerald

Comment thread crates/iceberg/src/arrow/incremental.rs Outdated
@gbrgr gbrgr enabled auto-merge (squash) March 9, 2026 12:16
@gbrgr gbrgr merged commit 4c94e86 into main Mar 9, 2026
19 checks passed
@gbrgr gbrgr deleted the gb/eq-deletes branch March 9, 2026 12:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants