Skip to content

Commit 4182137

Browse files
authored
Merge pull request #74 from RelationalAI/gb/inc-scan-fix
fix(incremental-scan): treat EXISTING manifest entries as appends when from=None
2 parents 6c98380 + 2c75ac1 commit 4182137

7 files changed

Lines changed: 286 additions & 96 deletions

File tree

.github/workflows/bindings_python_ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ jobs:
9595
- uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6
9696
with:
9797
python-version: 3.12
98-
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1
98+
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1
9999
with:
100100
working-directory: "bindings/python"
101101
command: build

.github/workflows/codeql.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ jobs:
4646
persist-credentials: false
4747

4848
- name: Initialize CodeQL
49-
uses: github/codeql-action/init@c10b8064de6f491fea524254123dbe5e09572f13 # v4
49+
uses: github/codeql-action/init@c10b8064de6f491fea524254123dbe5e09572f13 # v4.35.1
5050
with:
5151
languages: actions
5252

5353
- name: Perform CodeQL Analysis
54-
uses: github/codeql-action/analyze@c10b8064de6f491fea524254123dbe5e09572f13 # v4
54+
uses: github/codeql-action/analyze@c10b8064de6f491fea524254123dbe5e09572f13 # v4.35.1
5555
with:
5656
category: "/language:actions"

.github/workflows/release_python.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,13 @@ jobs:
124124
env:
125125
NEEDS_VALIDATE_RELEASE_TAG_OUTPUTS_CARGO_VERSION: ${{ needs.validate-release-tag.outputs.cargo-version }}
126126

127-
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1
127+
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1
128128
with:
129129
working-directory: "bindings/python"
130130
command: sdist
131131
args: -o dist
132132
- name: Upload sdist
133-
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
133+
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
134134
with:
135135
name: wheels-sdist
136136
path: bindings/python/dist
@@ -184,15 +184,15 @@ jobs:
184184
uses: ./.github/actions/setup-builder
185185
with:
186186
rust-version: ${{ steps.get-msrv.outputs.msrv }}
187-
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1
187+
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1
188188
with:
189189
target: ${{ matrix.target }}
190190
manylinux: ${{ matrix.manylinux || 'auto' }}
191191
working-directory: "bindings/python"
192192
command: build
193193
args: --release -o dist -i python3.12 # Explicitly set interpreter; manylinux containers have multiple Pythons and maturin may pick an older one
194194
- name: Upload wheels
195-
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
195+
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
196196
with:
197197
name: wheels-${{ matrix.os }}-${{ matrix.target }}
198198
path: bindings/python/dist

.github/workflows/release_python_nightly.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,14 @@ jobs:
4848
with:
4949
timestamp: ${{ needs.set-version.outputs.TIMESTAMP }}
5050

51-
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1
51+
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1
5252
with:
5353
working-directory: "bindings/python"
5454
command: sdist
5555
args: -o dist
5656

5757
- name: Upload sdist
58-
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
58+
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
5959
with:
6060
name: wheels-sdist
6161
path: bindings/python/dist
@@ -98,7 +98,7 @@ jobs:
9898
with:
9999
rust-version: ${{ steps.get-msrv.outputs.msrv }}
100100

101-
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1
101+
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1
102102
with:
103103
target: ${{ matrix.target }}
104104
manylinux: ${{ matrix.manylinux || 'auto' }}
@@ -107,7 +107,7 @@ jobs:
107107
args: --release -o dist -i python3.12 # Explicitly set interpreter; manylinux containers have multiple Pythons and maturin may pick an older one
108108

109109
- name: Upload wheels
110-
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
110+
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
111111
with:
112112
name: wheels-${{ matrix.os }}-${{ matrix.target }}
113113
path: bindings/python/dist

crates/iceberg/src/scan/incremental/context.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ pub(crate) struct IncrementalPlanContext {
3838
/// The snapshot to start the incremental scan from.
3939
pub from_snapshot: SnapshotRef,
4040

41+
/// The user-provided from_snapshot_id (None means "scan from the beginning").
42+
/// Used to distinguish a true "from beginning" scan (full scan semantics) from
43+
/// an incremental scan with an explicit starting snapshot.
44+
pub from_snapshot_id: Option<i64>,
45+
4146
/// The metadata of the table being scanned.
4247
pub table_metadata: TableMetadataRef,
4348

@@ -67,13 +72,41 @@ impl IncrementalPlanContext {
6772
delete_file_idx: DeleteFileIndex,
6873
delete_file_tx: Sender<ManifestEntryContext>,
6974
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
70-
// Collect all snapshot IDs (all operation types are supported)
71-
let snapshot_ids: HashSet<i64> = self.snapshots.iter().map(|s| s.snapshot_id()).collect();
72-
7375
// Separate delete and data manifests to ensure deletes are processed first.
7476
// This prevents deadlock by ensuring delete processing completes
7577
// (and builds the delete filter) before data manifests are fetched.
76-
let (delete_manifests, data_manifests, filter_fn) = {
78+
let (delete_manifests, data_manifests, filter_fn) = if self.from_snapshot_id.is_none() {
79+
// When from=None, use full-scan semantics: read ALL manifests from to_snapshot's
80+
// manifest list without filtering by added_snapshot_id. This handles tables where
81+
// older snapshots have been expired — their data files appear as EXISTING entries
82+
// in later manifests and would otherwise be silently dropped.
83+
let to_snapshot = self.snapshots.first().expect("snapshots is non-empty");
84+
let manifest_list = self
85+
.object_cache
86+
.get_manifest_list(to_snapshot, &self.table_metadata)
87+
.await?;
88+
89+
let mut delete_manifests = HashSet::<ManifestFile>::new();
90+
let mut data_manifests = HashSet::<ManifestFile>::new();
91+
for entry in manifest_list.entries() {
92+
if entry.content == ManifestContentType::Deletes {
93+
delete_manifests.insert(entry.clone());
94+
} else {
95+
data_manifests.insert(entry.clone());
96+
}
97+
}
98+
99+
// No entry-level filtering: include entries from expired snapshots too.
100+
(
101+
delete_manifests,
102+
data_manifests,
103+
None::<Arc<ManifestEntryFilterFn>>,
104+
)
105+
} else {
106+
// Collect all snapshot IDs (all operation types are supported)
107+
let snapshot_ids: HashSet<i64> =
108+
self.snapshots.iter().map(|s| s.snapshot_id()).collect();
109+
77110
let mut delete_manifests = HashSet::<ManifestFile>::new();
78111
let mut data_manifests = HashSet::<ManifestFile>::new();
79112

crates/iceberg/src/scan/incremental/mod.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ impl<'a> IncrementalTableScanBuilder<'a> {
370370
let plan_context = IncrementalPlanContext {
371371
snapshots,
372372
from_snapshot: snapshot_from,
373+
from_snapshot_id: self.from_snapshot_id,
373374
table_metadata: self.table.metadata_ref(),
374375
to_snapshot_schema: schema,
375376
object_cache: self.table.object_cache().clone(),
@@ -504,8 +505,11 @@ impl IncrementalTableScan {
504505

505506
// Collect data files that were live at from_snapshot. These are needed to generate
506507
// equality delete tasks for files that predate the scan range.
507-
// Runs in parallel with the rest of the planning.
508-
let from_snapshot_collection = if all_deletes.iter().any(is_equality_delete) {
508+
// Skipped when from=None because in that case all data files are treated as appends
509+
// (full-scan semantics), so equality deletes are handled via AppendedFileScanTask.
510+
let from_snapshot_collection = if self.plan_context.from_snapshot_id.is_some()
511+
&& all_deletes.iter().any(is_equality_delete)
512+
{
509513
Some(Self::spawn_manifest_entry_collection(
510514
self.plan_context.from_snapshot.clone(),
511515
self.plan_context.table_metadata.clone(),
@@ -536,6 +540,7 @@ impl IncrementalTableScan {
536540
// Process the data file [`ManifestEntry`] stream in parallel
537541
let filter = delete_filter.clone();
538542
let table_metadata = self.plan_context.table_metadata.clone();
543+
let from_snapshot_is_none = self.plan_context.from_snapshot_id.is_none();
539544
spawn(async move {
540545
let result = manifest_entry_data_ctx_rx
541546
.map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
@@ -545,8 +550,13 @@ impl IncrementalTableScan {
545550
let filter = filter.clone();
546551
let table_metadata = table_metadata.clone();
547552
async move {
548-
if manifest_entry_context.manifest_entry.status()
549-
== ManifestStatus::Added
553+
let status = manifest_entry_context.manifest_entry.status();
554+
// When from=None (full-scan semantics), treat both Added and Existing
555+
// entries as appends. Existing entries arise from expired snapshots or
556+
// manifest rewrites — they represent live files that must be included.
557+
// Deleted entries are skipped since they are not live.
558+
if status == ManifestStatus::Added
559+
|| (from_snapshot_is_none && status == ManifestStatus::Existing)
550560
{
551561
spawn(async move {
552562
Self::process_data_manifest_entry(
@@ -558,9 +568,7 @@ impl IncrementalTableScan {
558568
.await
559569
})
560570
.await
561-
} else if manifest_entry_context.manifest_entry.status()
562-
== ManifestStatus::Deleted
563-
{
571+
} else if status == ManifestStatus::Deleted && !from_snapshot_is_none {
564572
spawn(async move {
565573
Self::process_deleted_data_manifest_entry(
566574
tx,

0 commit comments

Comments
 (0)