Skip to content

Commit 21807f4

Browse files
SNOW-3473261: Add Iceberg branch / tag time travel via version_ref
Add ``version_ref`` time-travel parameter that emits Snowflake's ``AT(VERSION_REF => '<name>')`` clause — the unified branch / tag syntax that subsumes Spark Iceberg's ``VERSION AS OF '<x>'`` (branch + tag) and the 4-part ``branch_<x>`` / ``tag_<x>`` identifier forms. Mirrors the merged ``version=`` (snapshot-id) PR (#4231): * Hidden behind ``**kwargs`` on ``Session.table()``, ``DataFrameReader.table()``, and ``Table.__init__()`` — consumed by Snowpark Connect, not part of the advertised public surface yet. * ``DataFrameReader`` reader option aliases ``version_ref`` and ``version-ref`` (matching the ``snapshot-id`` / ``snapshot_id`` style); both auto-set ``time_travel_mode='at'`` since branch / tag reads are positional, not range-of-time. * Validation: ``time_travel_mode='before'`` is rejected; ``version_ref`` is mutually exclusive with the other time-travel parameters (``statement``, ``offset``, ``timestamp``, ``stream``, ``version``); non-string and empty refs are rejected. * No AST proto changes — the field travels through ``**kwargs`` the same way ``version=`` does. * No CHANGELOG entry — internal kwargs surface, not customer- visible (matches PR #4231). Unit tests cover SQL emission, validation, conflicts, and ``_extract_time_travel_from_options`` extraction for both alias forms. Two integ tests under ``tests/integ/test_dataframe.py`` are added but ``pytest.mark.skip``'d pending a CI account with a CLD-linked unmanaged Iceberg table that has named branches / tags and ``FEATURE_ICEBERG_TIME_TRAVEL`` enabled (manual reproducer lives in the snowflake-eng/sas repo). Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 4afc47d commit 21807f4

6 files changed

Lines changed: 296 additions & 20 deletions

File tree

src/snowflake/snowpark/_internal/utils.py

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1956,6 +1956,7 @@ class TimeTravelConfig(NamedTuple):
19561956
timestamp_type: Optional[str] = None
19571957
stream: Optional[str] = None
19581958
version: Optional[int] = None
1959+
version_ref: Optional[str] = None
19591960

19601961
@staticmethod
19611962
def validate_and_normalize_params(
@@ -1966,6 +1967,7 @@ def validate_and_normalize_params(
19661967
timestamp_type: Optional[Union[str, "TimestampTimeZone"]] = None,
19671968
stream: Optional[str] = None,
19681969
version: Optional[int] = None,
1970+
version_ref: Optional[str] = None,
19691971
) -> Optional["TimeTravelConfig"]:
19701972
"""
19711973
Validates and normalizes time travel parameters.
@@ -1988,7 +1990,8 @@ def validate_and_normalize_params(
19881990
ValueError: If parameters are invalid.
19891991
"""
19901992
time_travel_arg_count = sum(
1991-
arg is not None for arg in (statement, offset, timestamp, stream, version)
1993+
arg is not None
1994+
for arg in (statement, offset, timestamp, stream, version, version_ref)
19921995
)
19931996

19941997
# Validate mode
@@ -2023,10 +2026,34 @@ def validate_and_normalize_params(
20232026
f"'version' must be an int Iceberg snapshot id, got {type(version).__name__}."
20242027
)
20252028

2029+
# version_ref (Iceberg branch / tag name, unified under Snowflake's
2030+
# ``AT(VERSION_REF => '<name>')`` grammar) only works with 'at' mode —
2031+
# Iceberg branch / tag reads are positional, not range-of-time, so
2032+
# ``BEFORE`` has no meaning.
2033+
if version_ref is not None and time_travel_mode.lower() != "at":
2034+
raise ValueError(
2035+
"Iceberg version_ref time travel can only be used with "
2036+
"time_travel_mode='at', not 'before'."
2037+
)
2038+
2039+
# Validate version_ref type — Iceberg ref names are strings (branch /
2040+
# tag identifiers). Empty strings are invalid.
2041+
if version_ref is not None:
2042+
if not isinstance(version_ref, str):
2043+
raise ValueError(
2044+
f"'version_ref' must be a string Iceberg branch / tag name, "
2045+
f"got {type(version_ref).__name__}."
2046+
)
2047+
if not version_ref:
2048+
raise ValueError(
2049+
"'version_ref' must be a non-empty Iceberg branch / tag name."
2050+
)
2051+
20262052
# Validate exactly one parameter is provided
20272053
if time_travel_arg_count != 1:
20282054
raise ValueError(
2029-
"Exactly one of 'statement', 'offset', 'timestamp', 'stream', or 'version' must be provided."
2055+
"Exactly one of 'statement', 'offset', 'timestamp', 'stream', "
2056+
"'version', or 'version_ref' must be provided."
20302057
)
20312058

20322059
# Normalize timestamp
@@ -2061,6 +2088,7 @@ def validate_and_normalize_params(
20612088
timestamp_type=timestamp_type,
20622089
stream=stream,
20632090
version=version,
2091+
version_ref=version_ref,
20642092
)
20652093

20662094
def generate_sql_clause(self) -> str:
@@ -2069,8 +2097,10 @@ def generate_sql_clause(self) -> str:
20692097
Args:
20702098
config: Time travel configuration.
20712099
Returns:
2072-
SQL clause like " AT (TIMESTAMP => TO_TIMESTAMP_NTZ('...'))" or
2073-
" AT (VERSION => 1234567890)" for Iceberg snapshot id time travel.
2100+
SQL clause like " AT (TIMESTAMP => TO_TIMESTAMP_NTZ('...'))",
2101+
" AT (VERSION => 1234567890)" for Iceberg snapshot id time travel,
2102+
or " AT (VERSION_REF => 'release_v1')" for Iceberg branch / tag
2103+
time travel.
20742104
"""
20752105
clause = f" {self.time_travel_mode.upper()} "
20762106

@@ -2082,6 +2112,8 @@ def generate_sql_clause(self) -> str:
20822112
clause += f"(STREAM => '{self.stream}')"
20832113
elif self.version is not None:
20842114
clause += f"(VERSION => {self.version})"
2115+
elif self.version_ref is not None:
2116+
clause += f"(VERSION_REF => '{self.version_ref}')"
20852117
elif self.timestamp is not None:
20862118
if self.timestamp_type is not None:
20872119
timestamp_type = self.timestamp_type.upper()

src/snowflake/snowpark/dataframe_reader.py

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,14 @@ def _extract_time_travel_from_options(options: dict) -> dict:
161161
- Automatically set time_travel_mode to 'at'
162162
(Iceberg snapshot ids only support ``AT(VERSION => N)``, not ``BEFORE``)
163163
- Cannot be used with time_travel_mode='before' (raises error)
164+
165+
Special handling for 'VERSION_REF' / 'VERSION-REF' (Iceberg branch / tag
166+
name) — both aliases map to the internal ``version_ref`` time travel
167+
parameter and emit ``AT(VERSION_REF => '<name>')`` on the Snowflake side
168+
(unified branch / tag clause, see Spark Iceberg's ``VERSION AS OF '<x>'``):
169+
- Automatically set time_travel_mode to 'at'
170+
(branch / tag reads are positional, not range-of-time)
171+
- Cannot be used with time_travel_mode='before' (raises error)
164172
"""
165173
result = {}
166174
excluded_keys = set()
@@ -211,6 +219,27 @@ def _extract_time_travel_from_options(options: dict) -> dict:
211219
)
212220
result["time_travel_mode"] = "at"
213221

222+
# Handle Iceberg branch / tag ref (``version_ref`` / ``version-ref``).
223+
# Both aliases route to the internal ``version_ref`` parameter and emit
224+
# ``AT(VERSION_REF => '<name>')`` server-side. Auto-sets mode='at'.
225+
version_ref_value = options.get("VERSION_REF")
226+
version_ref_source = "version_ref"
227+
if version_ref_value is None:
228+
version_ref_value = options.get("VERSION-REF")
229+
version_ref_source = "version-ref"
230+
if version_ref_value is not None:
231+
if (
232+
"TIME_TRAVEL_MODE" in options
233+
and options["TIME_TRAVEL_MODE"].lower() == "before"
234+
):
235+
raise ValueError(
236+
f"Cannot use '{version_ref_source}' option with "
237+
"time_travel_mode='before'. Iceberg branch / tag time travel "
238+
"only supports time_travel_mode='at'."
239+
)
240+
result["version_ref"] = str(version_ref_value)
241+
result["time_travel_mode"] = "at"
242+
214243
for option_key, param_name in _TIME_TRAVEL_OPTIONS_PARAMS_MAP.items():
215244
if option_key in options and option_key not in excluded_keys:
216245
result[param_name] = options[option_key]
@@ -634,11 +663,13 @@ def table(
634663
... .option("offset", -60) # This will be IGNORED
635664
... .table("my_table", time_travel_mode="at", offset=-3600)) # Only this is used
636665
"""
637-
# ``version`` (Iceberg snapshot id) is intentionally not in the public
638-
# signature — it's consumed by Snowpark Connect and may be removed
639-
# once a first-class API lands. Accept it through **kwargs so direct
640-
# callers can still pass it without us advertising it.
666+
# ``version`` (Iceberg snapshot id) and ``version_ref`` (Iceberg
667+
# branch / tag name) are intentionally not in the public signature —
668+
# they are consumed by Snowpark Connect and may be removed once a
669+
# first-class API lands. Accept them through **kwargs so direct
670+
# callers can still pass them without us advertising the surface.
641671
version = kwargs.pop("version", None)
672+
version_ref = kwargs.pop("version_ref", None)
642673
if kwargs:
643674
raise TypeError(
644675
f"table() got unexpected keyword arguments: {sorted(kwargs)}"
@@ -664,13 +695,20 @@ def table(
664695
if stream is not None:
665696
ast.stream.value = stream
666697

667-
if time_travel_mode is not None or version is not None:
668-
# If version is provided without mode, default to 'at' (snapshot ids
669-
# only make sense with AT — symmetric with iceberg_tag handling).
698+
if (
699+
time_travel_mode is not None
700+
or version is not None
701+
or version_ref is not None
702+
):
703+
# If version / version_ref is provided without mode, default to
704+
# 'at' — snapshot ids and branch / tag reads only make sense with
705+
# AT (symmetric with the as-of-timestamp option handling).
670706
effective_mode = (
671707
time_travel_mode
672708
if time_travel_mode
673-
else ("at" if version is not None else None)
709+
else (
710+
"at" if (version is not None or version_ref is not None) else None
711+
)
674712
)
675713
time_travel_params = {
676714
"time_travel_mode": effective_mode,
@@ -680,6 +718,7 @@ def table(
680718
"timestamp_type": timestamp_type,
681719
"stream": stream,
682720
"version": version,
721+
"version_ref": version_ref,
683722
}
684723
else:
685724
# if time_travel_mode is not provided, extract time travel config from options

src/snowflake/snowpark/session.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2773,11 +2773,13 @@ def table(
27732773
# timestamp_type remains "NTZ" (user's explicit choice respected)
27742774
>>> table2 = session.read.table("my_table", time_travel_mode="at", timestamp=tz_aware, timestamp_type="NTZ") # doctest: +SKIP
27752775
"""
2776-
# ``version`` (Iceberg snapshot id) is intentionally not in the public
2777-
# signature — it's consumed by Snowpark Connect and may be removed
2778-
# once a first-class API lands. Accept it through **kwargs so direct
2779-
# callers can still pass it without us advertising it.
2776+
# ``version`` (Iceberg snapshot id) and ``version_ref`` (Iceberg
2777+
# branch / tag name) are intentionally not in the public signature —
2778+
# they are consumed by Snowpark Connect and may be removed once a
2779+
# first-class API lands. Accept them through **kwargs so direct
2780+
# callers can still pass them without us advertising the surface.
27802781
version = kwargs.pop("version", None)
2782+
version_ref = kwargs.pop("version_ref", None)
27812783
if kwargs:
27822784
raise TypeError(
27832785
f"table() got unexpected keyword arguments: {sorted(kwargs)}"
@@ -2820,6 +2822,7 @@ def table(
28202822
timestamp_type=timestamp_type,
28212823
stream=stream,
28222824
version=version,
2825+
version_ref=version_ref,
28232826
)
28242827
# Replace API call origin for table
28252828
set_api_call_source(t, "Session.table")

src/snowflake/snowpark/table.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -296,11 +296,13 @@ def __init__(
296296
stream: Optional[str] = None,
297297
**kwargs,
298298
) -> None:
299-
# ``version`` (Iceberg snapshot id) is intentionally not in the public
300-
# signature — it's consumed by Snowpark Connect and may be removed
301-
# once a first-class API lands. Accept it through **kwargs so direct
302-
# callers can still pass it without us advertising it.
299+
# ``version`` (Iceberg snapshot id) and ``version_ref`` (Iceberg
300+
# branch / tag name) are intentionally not in the public signature —
301+
# they are consumed by Snowpark Connect and may be removed once a
302+
# first-class API lands. Accept them through **kwargs so direct
303+
# callers can still pass them without us advertising the surface.
303304
version = kwargs.pop("version", None)
305+
version_ref = kwargs.pop("version_ref", None)
304306
if kwargs:
305307
raise TypeError(
306308
f"Table() got unexpected keyword arguments: {sorted(kwargs)}"
@@ -333,6 +335,7 @@ def __init__(
333335
timestamp_type=timestamp_type,
334336
stream=stream,
335337
version=version,
338+
version_ref=version_ref,
336339
)
337340

338341
snowflake_table_plan = SnowflakeTable(

tests/integ/test_dataframe.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8423,3 +8423,72 @@ def test_iceberg_snapshot_id_time_travel_dataframe_reader_option(session):
84238423
session.read.option("snapshot-id", snapshot_id).table(table_fqn).collect()
84248424
)
84258425
assert via_kwarg == via_option
8426+
8427+
8428+
# ----------------------------------------------------------------------
8429+
# Iceberg branch / tag (``version_ref=``) time travel.
8430+
#
8431+
# TODO(SNOW-3473261): Wire these up to a CI test account that has:
8432+
# * a Catalog-Linked Database (CLD) such as cldUnity / cldglue, AND
8433+
# * an unmanaged Iceberg table inside it that exposes a named branch or
8434+
# tag (e.g. created via ``ALTER TABLE ... CREATE TAG <name>`` on the
8435+
# OSS Iceberg side, or via the catalog's branch / tag API).
8436+
#
8437+
# Snowflake's ``AT(VERSION_REF => '<name>')`` syntax is the unified
8438+
# branch / tag clause that subsumes Spark Iceberg's
8439+
# ``VERSION AS OF '<x>'`` (which accepts both branch and tag names) and
8440+
# the 4-part ``prod.db.t.`branch_<x>`` / ``tag_<x>`` identifier forms.
8441+
# Like the snapshot-id surface, it currently requires
8442+
# ``FEATURE_ICEBERG_TIME_TRAVEL`` on the account and is scoped to
8443+
# unmanaged Iceberg tables in CLDs, so these tests are skipped by default
8444+
# and exercised manually against ``sfctest0`` (see
8445+
# ``tests/sas_tests/test_iceberg_version_ref_sample.py`` in
8446+
# snowflake-eng/sas for the manual reproducer).
8447+
# ----------------------------------------------------------------------
8448+
@pytest.mark.skip(
8449+
reason=(
8450+
"Requires a CLD-linked unmanaged Iceberg table with at least one "
8451+
"named branch / tag and FEATURE_ICEBERG_TIME_TRAVEL enabled on the "
8452+
"account. Tested manually; see TODO above."
8453+
)
8454+
)
8455+
def test_iceberg_version_ref_time_travel_session_table(session):
8456+
"""End-to-end: ``Session.table(..., version_ref='<name>')`` returns the
8457+
table state at the requested Iceberg branch / tag ref."""
8458+
table_fqn = "CLDUNITY.scosschema.snapshot_demo"
8459+
# Demo table is set up with a tag ``first_load`` pointing at the
8460+
# earliest snapshot (see sas-side reproducer for the setup script).
8461+
tag_name = "first_load"
8462+
8463+
tagged = session.table(
8464+
table_fqn, time_travel_mode="at", version_ref=tag_name
8465+
).collect()
8466+
latest = session.table(table_fqn).collect()
8467+
# Tag reads use the snapshot's schema as it existed at the tag —
8468+
# row count at an earlier tag should be ≤ current row count, since the
8469+
# tag is bound to a specific (earlier) snapshot.
8470+
assert len(tagged) <= len(latest)
8471+
8472+
8473+
@pytest.mark.skip(
8474+
reason=(
8475+
"Requires a CLD-linked unmanaged Iceberg table with at least one "
8476+
"named branch / tag and FEATURE_ICEBERG_TIME_TRAVEL enabled on the "
8477+
"account. Tested manually; see TODO above."
8478+
)
8479+
)
8480+
def test_iceberg_version_ref_time_travel_dataframe_reader_option(session):
8481+
"""End-to-end: ``session.read.option('version_ref', 'name').table(...)``
8482+
routes through the Iceberg-compat option alias and produces the same
8483+
result as the explicit ``version_ref=`` kwarg."""
8484+
table_fqn = "CLDUNITY.scosschema.snapshot_demo"
8485+
tag_name = "first_load"
8486+
8487+
via_kwarg = session.read.table(
8488+
table_fqn, time_travel_mode="at", version_ref=tag_name
8489+
).collect()
8490+
via_option = session.read.option("version_ref", tag_name).table(table_fqn).collect()
8491+
via_hyphen_option = (
8492+
session.read.option("version-ref", tag_name).table(table_fqn).collect()
8493+
)
8494+
assert via_kwarg == via_option == via_hyphen_option

0 commit comments

Comments
 (0)