Skip to content

Add Iceberg incremental read via CHANGES AT(VERSION) END(VERSION)#4262

Open
sfc-gh-igarish wants to merge 2 commits into
mainfrom
igarish/iceberg-incremental-read-changes
Open

Add Iceberg incremental read via CHANGES AT(VERSION) END(VERSION)#4262
sfc-gh-igarish wants to merge 2 commits into
mainfrom
igarish/iceberg-incremental-read-changes

Conversation

@sfc-gh-igarish

Copy link
Copy Markdown
Collaborator

Spark Iceberg incremental reads use start-snapshot-id / end-snapshot-id reader options. Snowflake expresses the same semantics as:

SELECT * FROM t
CHANGES (INFORMATION => APPEND_ONLY)
AT (VERSION => start)
[ END (VERSION => end) ]

Add IcebergChangesConfig and wire it through DataFrameReader.table(), Session.table(), Table, and SnowflakeTable plan generation. The Spark option aliases (dashed and underscored) are accepted and validated to be mutually exclusive with existing time-travel reader options.

Tests cover SQL clause generation and option extraction.

  1. Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR.

    Fixes SNOW-NNNNNNN

  2. Fill out the following pre-review checklist:

    • I am adding a new automated test(s) to verify correctness of my new code
      • If this test skips Local Testing mode, I'm requesting review from @snowflakedb/local-testing
    • I am adding new logging messages
    • I am adding a new telemetry message
    • I am adding new credentials
    • I am adding a new dependency
    • If this is a new feature/behavior, I'm adding the Local Testing parity changes.
    • I acknowledge that I have ensured my changes to be thread-safe. Follow the link for more information: Thread-safe Developer Guidelines
    • If adding any arguments to public Snowpark APIs or creating new public Snowpark APIs, I acknowledge that I have ensured my changes include AST support. Follow the link for more information: AST Support Guidelines
  3. Please describe how your code solves the related issue.

    Please write a short description of how your code change solves the related issue.

Spark Iceberg incremental reads use start-snapshot-id / end-snapshot-id
reader options. Snowflake expresses the same semantics as:

  SELECT * FROM t
    CHANGES (INFORMATION => APPEND_ONLY)
    AT (VERSION => start)
    [ END (VERSION => end) ]

Add IcebergChangesConfig and wire it through DataFrameReader.table(),
Session.table(), Table, and SnowflakeTable plan generation. The Spark
option aliases (dashed and underscored) are accepted and validated to be
mutually exclusive with existing time-travel reader options.

Tests cover SQL clause generation and option extraction.

Co-authored-by: Cursor <cursoragent@cursor.com>
Mirror the snapshot-id / version_tag integration test pattern:

- test_iceberg_incremental_read_session_table_kwargs compares
  Session.table(start_snapshot_id=..., end_snapshot_id=...) against
  raw CHANGES ... AT(VERSION => ...) END(VERSION => ...) SQL.
- test_iceberg_incremental_read_dataframe_reader_option checks the
  Spark option aliases match the kwargs path and asserts the emitted
  SQL contains the CHANGES clause.
- test_iceberg_incremental_read_start_snapshot_only covers the
  start-only path (no END clause).

All three are @pytest.mark.skip by default — same CLD +
FEATURE_ICEBERG_TIME_TRAVEL prerequisites as the snapshot-id tests;
run manually against sfctest0 / CLDUNITY.scosschema.snapshot_demo.

Co-authored-by: Cursor <cursoragent@cursor.com>
@sfc-gh-yuwang

Copy link
Copy Markdown
Collaborator

Is this a public facing change? If so can you help add changelog in corresponding section?

@codecov-commenter

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 54.45545% with 46 lines in your changes missing coverage. Please review.
✅ Project coverage is 95.40%. Comparing base (9975596) to head (2de241b).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/snowflake/snowpark/dataframe_reader.py 46.93% 24 Missing and 2 partials ⚠️
src/snowflake/snowpark/_internal/utils.py 68.75% 10 Missing ⚠️
...lake/snowpark/_internal/analyzer/snowflake_plan.py 20.00% 2 Missing and 2 partials ⚠️
src/snowflake/snowpark/table.py 50.00% 2 Missing and 2 partials ⚠️
...ke/snowpark/_internal/analyzer/select_statement.py 33.33% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4262      +/-   ##
==========================================
- Coverage   95.49%   95.40%   -0.10%     
==========================================
  Files         171      171              
  Lines       44243    44340      +97     
  Branches     7548     7574      +26     
==========================================
+ Hits        42252    42304      +52     
- Misses       1226     1264      +38     
- Partials      765      772       +7     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.


def __copy__(self) -> "Table":
kwargs = {}
if self._time_travel_config:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like time_travel_config is the only thing being forwarded when copy() is called, should we also include the added parameter in this PR? Or is this a expected behavior?

f"'{option_name}' must be a 64-bit integer Iceberg snapshot id, "
f"got {value!r}."
) from None
if isinstance(snapshot_id, bool):

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question about this part of code:
It looks like after the first try block, it is impossible for snapshot_id to be a bool value here, can you tell me what is the scenario this check trying to guard?

"Iceberg incremental read requires 'start-snapshot-id'; "
"'end-snapshot-id' cannot be used alone."
)
try:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part looks to be the same logic as _coerce_snapshot_id introduced in utils, is it possible to reuse?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants