Skip to content

Commit ece2b0f

Browse files
Feature: Incremental Append Scan
Adds IncrementalAppendScan, which accumulates the data appended between two snapshots. Builds on the BaseScan / ManifestGroupPlanner refactor in sm/table-scan-refactor; split out of #3364 per reviewer request, reviving the work from #2235. - Table.incremental_append_scan(...) builds an IncrementalAppendScan over the (from_snapshot_id_exclusive, to_snapshot_id_inclusive] range; StagedTable overrides it to raise, mirroring scan(). - Walks the append-only ancestors in the range, dedups the data manifests whose added_snapshot_id is in range (Set semantics via ManifestFile __eq__/__hash__), and filters manifest entries to ADDED-in-range via a new manifest_entry_filter on ManifestGroupPlanner.plan_files. - Projects onto the table's current schema (not the snapshot schema). - from_snapshot_id_exclusive is validated with is_parent_ancestor_of, so an expired start cursor is accepted as long as the lineage still passes through it; equal from/to is rejected. Adds the snapshot helpers ancestors_between_ids and is_parent_ancestor_of. - Unit tests (validation, current-schema projection, type preservation, expired-from) and integration tests (append-only, non-append ignored, schema evolution within range, partition/metrics pruning, disconnected snapshots), plus the test_incremental_read provision fixture. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 0064fc9 commit ece2b0f

6 files changed

Lines changed: 665 additions & 21 deletions

File tree

dev/provision.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,3 +395,37 @@
395395
)
396396
spark.sql(f"ALTER TABLE {catalog_name}.default.test_empty_scan_ordered_str WRITE ORDERED BY id")
397397
spark.sql(f"INSERT INTO {catalog_name}.default.test_empty_scan_ordered_str VALUES 'a', 'c'")
398+
399+
# Append scan fixture. Snapshots written:
400+
# 0: append (1, 'a')
401+
# 1: append (2, 'b')
402+
# 2: append (3, 'c'), (4, 'b')
403+
# 3: delete number=2
404+
# 4: append (5, 'd', 100) -- on evolved schema
405+
# 5: replace table -- lineage break
406+
spark.sql(
407+
f"""
408+
CREATE OR REPLACE TABLE {catalog_name}.default.test_incremental_read (
409+
number integer,
410+
letter string
411+
)
412+
USING iceberg
413+
PARTITIONED BY (letter)
414+
TBLPROPERTIES ('format-version'='2')
415+
"""
416+
)
417+
spark.sql(f"INSERT INTO {catalog_name}.default.test_incremental_read VALUES (1, 'a')")
418+
spark.sql(f"INSERT INTO {catalog_name}.default.test_incremental_read VALUES (2, 'b')")
419+
spark.sql(f"INSERT INTO {catalog_name}.default.test_incremental_read VALUES (3, 'c'), (4, 'b')")
420+
spark.sql(f"DELETE FROM {catalog_name}.default.test_incremental_read WHERE number = 2")
421+
spark.sql(f"ALTER TABLE {catalog_name}.default.test_incremental_read ADD COLUMN extra int")
422+
spark.sql(f"INSERT INTO {catalog_name}.default.test_incremental_read VALUES (5, 'd', 100)")
423+
spark.sql(
424+
f"""
425+
REPLACE TABLE {catalog_name}.default.test_incremental_read
426+
USING iceberg
427+
PARTITIONED BY (letter)
428+
TBLPROPERTIES ('format-version'='2')
429+
AS SELECT number, letter, extra FROM {catalog_name}.default.test_incremental_read
430+
"""
431+
)

0 commit comments

Comments
 (0)