Add Iceberg incremental read via CHANGES AT(VERSION) END(VERSION)#4262
Add Iceberg incremental read via CHANGES AT(VERSION) END(VERSION)#4262sfc-gh-igarish wants to merge 2 commits into
Conversation
Spark Iceberg incremental reads use start-snapshot-id / end-snapshot-id
reader options. Snowflake expresses the same semantics as:
SELECT * FROM t
CHANGES (INFORMATION => APPEND_ONLY)
AT (VERSION => start)
[ END (VERSION => end) ]
Add IcebergChangesConfig and wire it through DataFrameReader.table(),
Session.table(), Table, and SnowflakeTable plan generation. The Spark
option aliases (dashed and underscored) are accepted and validated to be
mutually exclusive with existing time-travel reader options.
Tests cover SQL clause generation and option extraction.
Co-authored-by: Cursor <cursoragent@cursor.com>
Mirror the snapshot-id / version_tag integration test pattern: - test_iceberg_incremental_read_session_table_kwargs compares Session.table(start_snapshot_id=..., end_snapshot_id=...) against raw CHANGES ... AT(VERSION => ...) END(VERSION => ...) SQL. - test_iceberg_incremental_read_dataframe_reader_option checks the Spark option aliases match the kwargs path and asserts the emitted SQL contains the CHANGES clause. - test_iceberg_incremental_read_start_snapshot_only covers the start-only path (no END clause). All three are @pytest.mark.skip by default — same CLD + FEATURE_ICEBERG_TIME_TRAVEL prerequisites as the snapshot-id tests; run manually against sfctest0 / CLDUNITY.scosschema.snapshot_demo. Co-authored-by: Cursor <cursoragent@cursor.com>
|
Is this a public facing change? If so can you help add changelog in corresponding section? |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4262 +/- ##
==========================================
- Coverage 95.49% 95.40% -0.10%
==========================================
Files 171 171
Lines 44243 44340 +97
Branches 7548 7574 +26
==========================================
+ Hits 42252 42304 +52
- Misses 1226 1264 +38
- Partials 765 772 +7 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
|
||
| def __copy__(self) -> "Table": | ||
| kwargs = {} | ||
| if self._time_travel_config: |
There was a problem hiding this comment.
looks like time_travel_config is the only thing being forwarded when copy() is called, should we also include the added parameter in this PR? Or is this a expected behavior?
| f"'{option_name}' must be a 64-bit integer Iceberg snapshot id, " | ||
| f"got {value!r}." | ||
| ) from None | ||
| if isinstance(snapshot_id, bool): |
There was a problem hiding this comment.
I have a question about this part of code:
It looks like after the first try block, it is impossible for snapshot_id to be a bool value here, can you tell me what is the scenario this check trying to guard?
| "Iceberg incremental read requires 'start-snapshot-id'; " | ||
| "'end-snapshot-id' cannot be used alone." | ||
| ) | ||
| try: |
There was a problem hiding this comment.
this part looks to be the same logic as _coerce_snapshot_id introduced in utils, is it possible to reuse?
Spark Iceberg incremental reads use start-snapshot-id / end-snapshot-id reader options. Snowflake expresses the same semantics as:
SELECT * FROM t
CHANGES (INFORMATION => APPEND_ONLY)
AT (VERSION => start)
[ END (VERSION => end) ]
Add IcebergChangesConfig and wire it through DataFrameReader.table(), Session.table(), Table, and SnowflakeTable plan generation. The Spark option aliases (dashed and underscored) are accepted and validated to be mutually exclusive with existing time-travel reader options.
Tests cover SQL clause generation and option extraction.
Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR.
Fixes SNOW-NNNNNNN
Fill out the following pre-review checklist:
Please describe how your code solves the related issue.
Please write a short description of how your code change solves the related issue.