From 4d794a40376f5280b4647061a0f786ea6bc18659 Mon Sep 17 00:00:00 2001 From: Ilesh garish Date: Mon, 22 Jun 2026 23:22:27 +0530 Subject: [PATCH 1/2] Add Iceberg incremental read via CHANGES AT(VERSION) END(VERSION) Spark Iceberg incremental reads use start-snapshot-id / end-snapshot-id reader options. Snowflake expresses the same semantics as: SELECT * FROM t CHANGES (INFORMATION => APPEND_ONLY) AT (VERSION => start) [ END (VERSION => end) ] Add IcebergChangesConfig and wire it through DataFrameReader.table(), Session.table(), Table, and SnowflakeTable plan generation. The Spark option aliases (dashed and underscored) are accepted and validated to be mutually exclusive with existing time-travel reader options. Tests cover SQL clause generation and option extraction. Co-authored-by: Cursor --- .../_internal/analyzer/select_statement.py | 6 +- .../_internal/analyzer/snowflake_plan.py | 9 +- .../_internal/analyzer/snowflake_plan_node.py | 4 +- src/snowflake/snowpark/_internal/utils.py | 83 ++++++++++++++ src/snowflake/snowpark/dataframe_reader.py | 104 ++++++++++++++++++ src/snowflake/snowpark/session.py | 4 + src/snowflake/snowpark/table.py | 23 ++++ tests/unit/test_utils.py | 42 +++++++ 8 files changed, 271 insertions(+), 4 deletions(-) diff --git a/src/snowflake/snowpark/_internal/analyzer/select_statement.py b/src/snowflake/snowpark/_internal/analyzer/select_statement.py index 15db45a247..435fcccdb9 100644 --- a/src/snowflake/snowpark/_internal/analyzer/select_statement.py +++ b/src/snowflake/snowpark/_internal/analyzer/select_statement.py @@ -603,7 +603,11 @@ def __init__( # Metadata/Attributes for the plan self._attributes: Optional[List[Attribute]] = None self.table_reference = self.entity.name - if self.entity.time_travel_config is not None: + if self.entity.iceberg_changes_config is not None: + self.table_reference += ( + self.entity.iceberg_changes_config.generate_sql_clause() + ) + elif self.entity.time_travel_config is not None: self.table_reference += self.entity.time_travel_config.generate_sql_clause() def __deepcopy__(self, memodict={}) -> "SelectableEntity": # noqa: B006 diff --git a/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py b/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py index 45cfbeec8c..98c4b5dd5e 100644 --- a/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py +++ b/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py @@ -1068,8 +1068,13 @@ def large_local_relation_plan( def table(self, table_name: str, source_plan: LogicalPlan) -> SnowflakePlan: table_reference = table_name - if isinstance(source_plan, SnowflakeTable) and source_plan.time_travel_config: - table_reference += source_plan.time_travel_config.generate_sql_clause() + if isinstance(source_plan, SnowflakeTable): + if source_plan.iceberg_changes_config: + table_reference += ( + source_plan.iceberg_changes_config.generate_sql_clause() + ) + elif source_plan.time_travel_config: + table_reference += source_plan.time_travel_config.generate_sql_clause() return self.query(project_statement([], table_reference), source_plan) diff --git a/src/snowflake/snowpark/_internal/analyzer/snowflake_plan_node.py b/src/snowflake/snowpark/_internal/analyzer/snowflake_plan_node.py index 0fc457ecf2..76a0a2ec8f 100644 --- a/src/snowflake/snowpark/_internal/analyzer/snowflake_plan_node.py +++ b/src/snowflake/snowpark/_internal/analyzer/snowflake_plan_node.py @@ -7,7 +7,7 @@ from enum import Enum from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple -from snowflake.snowpark._internal.utils import TimeTravelConfig +from snowflake.snowpark._internal.utils import IcebergChangesConfig, TimeTravelConfig from snowflake.snowpark._internal.analyzer.expression import Attribute, Expression from snowflake.snowpark._internal.analyzer.query_plan_analysis_utils import ( @@ -97,10 +97,12 @@ def __init__( session: "Session", is_temp_table_for_cleanup: bool = False, time_travel_config: Optional[TimeTravelConfig] = None, + iceberg_changes_config: Optional[IcebergChangesConfig] = None, ) -> None: super().__init__() self.name = name self.time_travel_config = time_travel_config + self.iceberg_changes_config = iceberg_changes_config # When `is_temp_table_for_cleanup` is True, it's a temp table # generated by Snowpark (currently only df.cache_result) under the hood # and users are not aware of it. diff --git a/src/snowflake/snowpark/_internal/utils.py b/src/snowflake/snowpark/_internal/utils.py index 638cb173e7..4eb8c318a8 100644 --- a/src/snowflake/snowpark/_internal/utils.py +++ b/src/snowflake/snowpark/_internal/utils.py @@ -2148,6 +2148,89 @@ def generate_sql_clause(self) -> str: return clause +class IcebergChangesConfig(NamedTuple): + """Configuration for Iceberg incremental reads via Snowflake ``CHANGES``. + + Spark Iceberg exposes incremental reads through:: + + spark.read.format("iceberg") + .option("start-snapshot-id", S1) + .option("end-snapshot-id", S2) # optional + .load("table") + + Snowflake translates this to:: + + SELECT * FROM table + CHANGES (INFORMATION => APPEND_ONLY) + AT (VERSION => S1) + [ END (VERSION => S2) ] + + When ``end_version`` is omitted, Snowflake uses the current snapshot as + the end of the change interval (same semantics as omitting ``END`` on + generic ``CHANGES`` queries). + """ + + start_version: int + end_version: Optional[int] = None + information: str = "APPEND_ONLY" + + @staticmethod + def _coerce_snapshot_id(value: object, option_name: str) -> int: + try: + snapshot_id = int(value) # type: ignore[arg-type] + except (TypeError, ValueError): + raise ValueError( + f"'{option_name}' must be a 64-bit integer Iceberg snapshot id, " + f"got {value!r}." + ) from None + if isinstance(snapshot_id, bool): + raise ValueError( + f"'{option_name}' must be a 64-bit integer Iceberg snapshot id, " + f"got {type(value).__name__}." + ) + return snapshot_id + + @staticmethod + def validate_and_normalize_params( + start_snapshot_id: Optional[int] = None, + end_snapshot_id: Optional[int] = None, + information: str = "APPEND_ONLY", + ) -> Optional["IcebergChangesConfig"]: + if start_snapshot_id is None and end_snapshot_id is None: + return None + if start_snapshot_id is None: + raise ValueError( + "Iceberg incremental read requires 'start-snapshot-id'; " + "'end-snapshot-id' cannot be used alone." + ) + start = IcebergChangesConfig._coerce_snapshot_id( + start_snapshot_id, "start-snapshot-id" + ) + end = None + if end_snapshot_id is not None: + end = IcebergChangesConfig._coerce_snapshot_id( + end_snapshot_id, "end-snapshot-id" + ) + info = information.upper() + if info not in ("APPEND_ONLY", "DEFAULT"): + raise ValueError( + "Iceberg incremental read 'information' must be 'APPEND_ONLY' " + f"or 'DEFAULT', got {information!r}." + ) + return IcebergChangesConfig( + start_version=start, end_version=end, information=info + ) + + def generate_sql_clause(self) -> str: + clause = ( + f" CHANGES (INFORMATION => {self.information}) " + f"AT (VERSION => {self.start_version})" + ) + if self.end_version is not None: + clause += f" END (VERSION => {self.end_version})" + return clause + + def get_line_numbers( commented_sql_query: str, child_uuids: List[str], diff --git a/src/snowflake/snowpark/dataframe_reader.py b/src/snowflake/snowpark/dataframe_reader.py index 16f4b641de..b929a56780 100644 --- a/src/snowflake/snowpark/dataframe_reader.py +++ b/src/snowflake/snowpark/dataframe_reader.py @@ -248,6 +248,81 @@ def _extract_time_travel_from_options(options: dict) -> dict: return result +def _get_reader_option(options: dict, *keys: str): + """Case-insensitive lookup for a reader option key.""" + for key in keys: + for option_key, value in options.items(): + if option_key.upper() == key.upper(): + return value + return None + + +def _extract_iceberg_changes_from_options(options: dict) -> dict: + """Extract Spark Iceberg incremental-read options from a reader dict. + + Maps ``start-snapshot-id`` / ``end-snapshot-id`` (and underscore + variants) to internal ``start_snapshot_id`` / ``end_snapshot_id`` + kwargs consumed by :meth:`Session.table`. + """ + start = _get_reader_option(options, "start-snapshot-id", "start_snapshot_id") + end = _get_reader_option(options, "end-snapshot-id", "end_snapshot_id") + if start is None and end is None: + return {} + if start is None: + raise ValueError( + "Iceberg incremental read requires 'start-snapshot-id'; " + "'end-snapshot-id' cannot be used alone." + ) + try: + start_id = int(start) + except (TypeError, ValueError): + raise ValueError( + "'start-snapshot-id' must be a 64-bit integer Iceberg snapshot id, " + f"got {start!r}." + ) from None + end_id = None + if end is not None: + try: + end_id = int(end) + except (TypeError, ValueError): + raise ValueError( + "'end-snapshot-id' must be a 64-bit integer Iceberg snapshot id, " + f"got {end!r}." + ) from None + return {"start_snapshot_id": start_id, "end_snapshot_id": end_id} + + +def _reader_options_conflict_with_incremental_read(options: dict) -> list[str]: + """Return reader option keys that cannot coexist with incremental read.""" + incremental_keys = { + "start-snapshot-id", + "start_snapshot_id", + "end-snapshot-id", + "end_snapshot_id", + } + if not any( + k.upper().replace("_", "-") in {x.replace("_", "-") for x in incremental_keys} + for k in options + ): + return [] + blocked = [] + for key in options: + upper = key.upper() + if upper in incremental_keys or upper.replace("_", "-") in { + x.replace("_", "-") for x in incremental_keys + }: + continue + if upper in _TIME_TRAVEL_OPTIONS_PARAMS_MAP or upper in ( + "SNAPSHOT-ID", + "SNAPSHOT_ID", + "AS-OF-TIMESTAMP", + "VERSION_TAG", + "VERSION-TAG", + ): + blocked.append(key) + return blocked + + class DataFrameReader: """Provides methods to load data in various supported formats from a Snowflake stage to a :class:`DataFrame`. The paths provided to the DataFrameReader must refer @@ -671,11 +746,29 @@ def table( # still pass them without us advertising the surface. version = kwargs.pop("version", None) version_tag = kwargs.pop("version_tag", None) + start_snapshot_id = kwargs.pop("start_snapshot_id", None) + end_snapshot_id = kwargs.pop("end_snapshot_id", None) if kwargs: raise TypeError( f"table() got unexpected keyword arguments: {sorted(kwargs)}" ) + changes_from_options = _extract_iceberg_changes_from_options(self._cur_options) + if changes_from_options: + conflicting = _reader_options_conflict_with_incremental_read( + self._cur_options + ) + if conflicting: + raise ValueError( + "Cannot combine Iceberg incremental read " + "('start-snapshot-id' / 'end-snapshot-id') with time travel " + f"options on the same read; found {conflicting!r}." + ) + if start_snapshot_id is None: + start_snapshot_id = changes_from_options["start_snapshot_id"] + if end_snapshot_id is None: + end_snapshot_id = changes_from_options.get("end_snapshot_id") + # AST. stmt = None if _emit_ast and self._ast is not None: @@ -697,6 +790,17 @@ def table( ast.stream.value = stream if ( + start_snapshot_id is not None + or end_snapshot_id is not None + or changes_from_options + ): + table = self._session.table( + name, + _emit_ast=False, + start_snapshot_id=start_snapshot_id, + end_snapshot_id=end_snapshot_id, + ) + elif ( time_travel_mode is not None or version is not None or version_tag is not None diff --git a/src/snowflake/snowpark/session.py b/src/snowflake/snowpark/session.py index cd5ce3ecfb..0464134f3e 100644 --- a/src/snowflake/snowpark/session.py +++ b/src/snowflake/snowpark/session.py @@ -2780,6 +2780,8 @@ def table( # still pass them without us advertising the surface. version = kwargs.pop("version", None) version_tag = kwargs.pop("version_tag", None) + start_snapshot_id = kwargs.pop("start_snapshot_id", None) + end_snapshot_id = kwargs.pop("end_snapshot_id", None) if kwargs: raise TypeError( f"table() got unexpected keyword arguments: {sorted(kwargs)}" @@ -2823,6 +2825,8 @@ def table( stream=stream, version=version, version_tag=version_tag, + start_snapshot_id=start_snapshot_id, + end_snapshot_id=end_snapshot_id, ) # Replace API call origin for table set_api_call_source(t, "Session.table") diff --git a/src/snowflake/snowpark/table.py b/src/snowflake/snowpark/table.py index fc863a6434..c18a0408b4 100644 --- a/src/snowflake/snowpark/table.py +++ b/src/snowflake/snowpark/table.py @@ -33,6 +33,7 @@ from snowflake.snowpark._internal.telemetry import add_api_call, set_api_call_source from snowflake.snowpark._internal.type_utils import ColumnOrLiteral from snowflake.snowpark._internal.utils import ( + IcebergChangesConfig, publicapi, TimeTravelConfig, ) @@ -303,11 +304,18 @@ def __init__( # still pass them without us advertising the surface. version = kwargs.pop("version", None) version_tag = kwargs.pop("version_tag", None) + start_snapshot_id = kwargs.pop("start_snapshot_id", None) + end_snapshot_id = kwargs.pop("end_snapshot_id", None) if kwargs: raise TypeError( f"Table() got unexpected keyword arguments: {sorted(kwargs)}" ) + iceberg_changes_config = IcebergChangesConfig.validate_and_normalize_params( + start_snapshot_id=start_snapshot_id, + end_snapshot_id=end_snapshot_id, + ) + if _ast_stmt is None and session is not None and _emit_ast: _ast_stmt = session._ast_batch.bind() ast = with_src_position(_ast_stmt.expr.table, _ast_stmt) @@ -337,12 +345,19 @@ def __init__( version=version, version_tag=version_tag, ) + if iceberg_changes_config is not None and time_travel_config is not None: + raise ValueError( + "Cannot combine Iceberg incremental read " + "('start-snapshot-id' / 'end-snapshot-id') with time travel " + "options on the same read." + ) snowflake_table_plan = SnowflakeTable( table_name, session=session, is_temp_table_for_cleanup=is_temp_table_for_cleanup, time_travel_config=time_travel_config, + iceberg_changes_config=iceberg_changes_config, ) if session.sql_simplifier_enabled: plan = session._analyzer.create_select_statement( @@ -358,6 +373,7 @@ def __init__( self.table_name: str = table_name #: The table name self._is_temp_table_for_cleanup = is_temp_table_for_cleanup self._time_travel_config = time_travel_config + self._iceberg_changes_config = iceberg_changes_config # By default, the set the initial API call to say 'Table.__init__' since # people could instantiate a table directly. This value is overwritten when @@ -368,6 +384,13 @@ def _copy_without_ast(self): kwargs = {} if self._time_travel_config: kwargs.update(self._time_travel_config._asdict()) + if self._iceberg_changes_config: + kwargs.update( + { + "start_snapshot_id": self._iceberg_changes_config.start_version, + "end_snapshot_id": self._iceberg_changes_config.end_version, + } + ) return Table( self.table_name, diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 141b5b9b59..666c28bb89 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -20,6 +20,7 @@ get_line_numbers, get_plan_from_line_numbers, TimeTravelConfig, + IcebergChangesConfig, ) from snowflake.snowpark._internal.analyzer.snowflake_plan import ( SnowflakePlan, @@ -1193,3 +1194,44 @@ def test_extract_time_travel_version_tag_option(): _extract_time_travel_from_options( {"VERSION-TAG": "release_v1", "TIME_TRAVEL_MODE": "before"} ) + + +def test_iceberg_changes_config_generate_sql_with_end(): + config = IcebergChangesConfig(start_version=111, end_version=222) + assert ( + config.generate_sql_clause() + == " CHANGES (INFORMATION => APPEND_ONLY) AT (VERSION => 111) END (VERSION => 222)" + ) + + +def test_iceberg_changes_config_generate_sql_without_end(): + config = IcebergChangesConfig(start_version=111) + assert ( + config.generate_sql_clause() + == " CHANGES (INFORMATION => APPEND_ONLY) AT (VERSION => 111)" + ) + + +def test_iceberg_changes_config_validate_requires_start(): + with pytest.raises(ValueError, match="requires 'start-snapshot-id'"): + IcebergChangesConfig.validate_and_normalize_params(end_snapshot_id=42) + + +def test_iceberg_changes_config_validate_rejects_non_integer(): + with pytest.raises(ValueError, match="start-snapshot-id"): + IcebergChangesConfig.validate_and_normalize_params(start_snapshot_id="abc") + + +def test_extract_iceberg_changes_from_options(): + from snowflake.snowpark.dataframe_reader import ( + _extract_iceberg_changes_from_options, + ) + + assert _extract_iceberg_changes_from_options( + {"start-snapshot-id": "1", "end-snapshot-id": "2"} + ) == {"start_snapshot_id": 1, "end_snapshot_id": 2} + assert _extract_iceberg_changes_from_options({"start-snapshot-id": "5"}) == { + "start_snapshot_id": 5, + "end_snapshot_id": None, + } + assert _extract_iceberg_changes_from_options({}) == {} From 2de241b9a3c8a3712df60b136e39d721c665ecd7 Mon Sep 17 00:00:00 2001 From: ilesh garish <111810784+sfc-gh-igarish@users.noreply.github.com> Date: Mon, 22 Jun 2026 23:30:37 +0530 Subject: [PATCH 2/2] Add skipped integ tests for Iceberg incremental read CHANGES surface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirror the snapshot-id / version_tag integration test pattern: - test_iceberg_incremental_read_session_table_kwargs compares Session.table(start_snapshot_id=..., end_snapshot_id=...) against raw CHANGES ... AT(VERSION => ...) END(VERSION => ...) SQL. - test_iceberg_incremental_read_dataframe_reader_option checks the Spark option aliases match the kwargs path and asserts the emitted SQL contains the CHANGES clause. - test_iceberg_incremental_read_start_snapshot_only covers the start-only path (no END clause). All three are @pytest.mark.skip by default — same CLD + FEATURE_ICEBERG_TIME_TRAVEL prerequisites as the snapshot-id tests; run manually against sfctest0 / CLDUNITY.scosschema.snapshot_demo. Co-authored-by: Cursor --- tests/integ/test_dataframe.py | 144 ++++++++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) diff --git a/tests/integ/test_dataframe.py b/tests/integ/test_dataframe.py index e3bf73713f..51c14ec6bf 100644 --- a/tests/integ/test_dataframe.py +++ b/tests/integ/test_dataframe.py @@ -8490,3 +8490,147 @@ def test_iceberg_version_tag_time_travel_dataframe_reader_option(session): session.read.option("version-tag", tag_name).table(table_fqn).collect() ) assert via_kwarg == via_option == via_hyphen_option + + +# ---------------------------------------------------------------------- +# Iceberg incremental read (``start-snapshot-id`` / ``end-snapshot-id``). +# +# TODO(SNOW-XXXXXX): Wire these up to a CI test account that has: +# * a Catalog-Linked Database (CLD) such as cldUnity / cldglue, AND +# * an unmanaged Iceberg table inside it with at least two snapshots +# readable through ``INFORMATION_SCHEMA.GET_TABLE_VERSIONS(...)``. +# +# Snowflake expresses incremental reads as:: +# +# SELECT * FROM +# CHANGES (INFORMATION => APPEND_ONLY) +# AT (VERSION => ) +# END (VERSION => ) +# +# Like the snapshot-id / tag surfaces above, this currently requires +# ``FEATURE_ICEBERG_TIME_TRAVEL`` on the account and is scoped to +# unmanaged Iceberg tables in CLDs, so these tests are skipped by default +# and exercised manually against ``sfctest0`` (see the oss-iceberg-tests +# ``query.incremental_read`` scenario in snowflake-eng/sas). +# ---------------------------------------------------------------------- +@pytest.mark.skip( + reason=( + "Requires a CLD-linked unmanaged Iceberg table with multiple " + "snapshots and FEATURE_ICEBERG_TIME_TRAVEL enabled on the account. " + "Tested manually; see TODO above." + ) +) +def test_iceberg_incremental_read_session_table_kwargs(session): + """End-to-end: ``Session.table(..., start_snapshot_id=..., end_snapshot_id=...)`` + returns append-only changes between two Iceberg snapshots.""" + table_fqn = "CLDUNITY.scosschema.snapshot_demo" + + snapshot_ids = [ + row["SNAPSHOT_ID"] + for row in session.sql( + f"SELECT SNAPSHOT_ID FROM " + f"TABLE(INFORMATION_SCHEMA.GET_TABLE_VERSIONS('{table_fqn}')) " + "ORDER BY SNAPSHOT_TIMESTAMP" + ).collect() + ] + assert len(snapshot_ids) >= 2, "Demo table needs at least 2 snapshots" + start_id, end_id = snapshot_ids[0], snapshot_ids[1] + + via_kwargs = session.table( + table_fqn, + start_snapshot_id=start_id, + end_snapshot_id=end_id, + ).collect() + via_sql = session.sql( + f"SELECT * FROM {table_fqn} " + f"CHANGES (INFORMATION => APPEND_ONLY) " + f"AT (VERSION => {start_id}) " + f"END (VERSION => {end_id})" + ).collect() + assert via_kwargs == via_sql + + +@pytest.mark.skip( + reason=( + "Requires a CLD-linked unmanaged Iceberg table with multiple " + "snapshots and FEATURE_ICEBERG_TIME_TRAVEL enabled on the account. " + "Tested manually; see TODO above." + ) +) +def test_iceberg_incremental_read_dataframe_reader_option(session): + """End-to-end: ``session.read.option('start-snapshot-id', S1) + .option('end-snapshot-id', S2).table(...)`` routes through the Spark + Iceberg-compat aliases and emits the ``CHANGES ... AT(VERSION => ...)`` + SQL surface.""" + table_fqn = "CLDUNITY.scosschema.snapshot_demo" + + snapshot_ids = [ + row["SNAPSHOT_ID"] + for row in session.sql( + f"SELECT SNAPSHOT_ID FROM " + f"TABLE(INFORMATION_SCHEMA.GET_TABLE_VERSIONS('{table_fqn}')) " + "ORDER BY SNAPSHOT_TIMESTAMP" + ).collect() + ] + assert len(snapshot_ids) >= 2, "Demo table needs at least 2 snapshots" + start_id, end_id = snapshot_ids[0], snapshot_ids[1] + + via_kwargs = session.table( + table_fqn, + start_snapshot_id=start_id, + end_snapshot_id=end_id, + ).collect() + via_option = ( + session.read.option("start-snapshot-id", start_id) + .option("end-snapshot-id", end_id) + .table(table_fqn) + .collect() + ) + assert via_kwargs == via_option + + df = ( + session.read.option("start-snapshot-id", start_id) + .option("end-snapshot-id", end_id) + .table(table_fqn) + ) + df.collect() + sql = df.queries["queries"][0] + assert "CHANGES (INFORMATION => APPEND_ONLY)" in sql + assert f"AT (VERSION => {start_id})" in sql + assert f"END (VERSION => {end_id})" in sql + + +@pytest.mark.skip( + reason=( + "Requires a CLD-linked unmanaged Iceberg table with multiple " + "snapshots and FEATURE_ICEBERG_TIME_TRAVEL enabled on the account. " + "Tested manually; see TODO above." + ) +) +def test_iceberg_incremental_read_start_snapshot_only(session): + """End-to-end: ``start-snapshot-id`` without ``end-snapshot-id`` omits the + ``END (VERSION => ...)`` clause and reads append-only changes through the + current snapshot (Snowflake ``CHANGES`` default end point).""" + table_fqn = "CLDUNITY.scosschema.snapshot_demo" + + start_id = session.sql( + f"SELECT SNAPSHOT_ID FROM " + f"TABLE(INFORMATION_SCHEMA.GET_TABLE_VERSIONS('{table_fqn}')) " + "ORDER BY SNAPSHOT_TIMESTAMP LIMIT 1" + ).collect()[0]["SNAPSHOT_ID"] + + via_kwargs = session.table( + table_fqn, + start_snapshot_id=start_id, + ).collect() + via_option = ( + session.read.option("start-snapshot-id", start_id).table(table_fqn).collect() + ) + assert via_kwargs == via_option + + df = session.read.option("start-snapshot-id", start_id).table(table_fqn) + df.collect() + sql = df.queries["queries"][0] + assert "CHANGES (INFORMATION => APPEND_ONLY)" in sql + assert f"AT (VERSION => {start_id})" in sql + assert "END (VERSION =>" not in sql