Skip to content

Commit 05bf4cf

Browse files
SNOW-3473261: Add Iceberg tag time travel via version_tag
Add ``version_tag`` time-travel parameter that emits Snowflake's released ``AT(VERSION_TAG => '<name>')`` clause — the tag-only form of Iceberg time travel (Spark Iceberg's ``VERSION AS OF '<tag_name>'`` for tag reads). Scope: tag reads only. Branch reads are deferred until Snowflake's unified ``VERSION_REF`` syntax (which subsumes branch + tag) ships; the Python kwarg ``version_tag`` and SQL ``VERSION_TAG`` keyword match Snowflake's already-released grammar. Mirrors the merged ``version=`` (snapshot-id) PR (#4231): * Hidden behind ``**kwargs`` on ``Session.table()``, ``DataFrameReader.table()``, and ``Table.__init__()`` — consumed by Snowpark Connect, not part of the advertised public surface yet. * ``DataFrameReader`` reader option aliases ``version_tag`` and ``version-tag`` (matching the ``snapshot-id`` / ``snapshot_id`` style); both auto-set ``time_travel_mode='at'`` since tag reads are positional (bound to a specific snapshot), not range-of-time. * Validation: ``time_travel_mode='before'`` is rejected; ``version_tag`` is mutually exclusive with the other time-travel parameters (``statement``, ``offset``, ``timestamp``, ``stream``, ``version``); non-string and empty tag names are rejected. * No AST proto changes — the field travels through ``**kwargs`` the same way ``version=`` does. * No CHANGELOG entry — internal kwargs surface, not customer- visible (matches PR #4231). Unit tests cover SQL emission, validation, conflicts, and ``_extract_time_travel_from_options`` extraction for both alias forms. Two integ tests under ``tests/integ/test_dataframe.py`` are added but ``pytest.mark.skip``'d pending a CI account with a CLD-linked unmanaged Iceberg table that has named tags and ``FEATURE_ICEBERG_TIME_TRAVEL`` enabled (manual reproducer lives in the snowflake-eng/sas repo). Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 4afc47d commit 05bf4cf

6 files changed

Lines changed: 290 additions & 20 deletions

File tree

src/snowflake/snowpark/_internal/utils.py

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1956,6 +1956,7 @@ class TimeTravelConfig(NamedTuple):
19561956
timestamp_type: Optional[str] = None
19571957
stream: Optional[str] = None
19581958
version: Optional[int] = None
1959+
version_tag: Optional[str] = None
19591960

19601961
@staticmethod
19611962
def validate_and_normalize_params(
@@ -1966,6 +1967,7 @@ def validate_and_normalize_params(
19661967
timestamp_type: Optional[Union[str, "TimestampTimeZone"]] = None,
19671968
stream: Optional[str] = None,
19681969
version: Optional[int] = None,
1970+
version_tag: Optional[str] = None,
19691971
) -> Optional["TimeTravelConfig"]:
19701972
"""
19711973
Validates and normalizes time travel parameters.
@@ -1988,7 +1990,8 @@ def validate_and_normalize_params(
19881990
ValueError: If parameters are invalid.
19891991
"""
19901992
time_travel_arg_count = sum(
1991-
arg is not None for arg in (statement, offset, timestamp, stream, version)
1993+
arg is not None
1994+
for arg in (statement, offset, timestamp, stream, version, version_tag)
19921995
)
19931996

19941997
# Validate mode
@@ -2023,10 +2026,32 @@ def validate_and_normalize_params(
20232026
f"'version' must be an int Iceberg snapshot id, got {type(version).__name__}."
20242027
)
20252028

2029+
# version_tag (Iceberg tag name, mapped to Snowflake's
2030+
# ``AT(VERSION_TAG => '<name>')`` grammar) only works with 'at' mode —
2031+
# Iceberg tag reads are positional (bound to a specific snapshot),
2032+
# not range-of-time, so ``BEFORE`` has no meaning.
2033+
if version_tag is not None and time_travel_mode.lower() != "at":
2034+
raise ValueError(
2035+
"Iceberg version_tag time travel can only be used with "
2036+
"time_travel_mode='at', not 'before'."
2037+
)
2038+
2039+
# Validate version_tag type — Iceberg tag names are strings. Empty
2040+
# strings are invalid.
2041+
if version_tag is not None:
2042+
if not isinstance(version_tag, str):
2043+
raise ValueError(
2044+
f"'version_tag' must be a string Iceberg tag name, "
2045+
f"got {type(version_tag).__name__}."
2046+
)
2047+
if not version_tag:
2048+
raise ValueError("'version_tag' must be a non-empty Iceberg tag name.")
2049+
20262050
# Validate exactly one parameter is provided
20272051
if time_travel_arg_count != 1:
20282052
raise ValueError(
2029-
"Exactly one of 'statement', 'offset', 'timestamp', 'stream', or 'version' must be provided."
2053+
"Exactly one of 'statement', 'offset', 'timestamp', 'stream', "
2054+
"'version', or 'version_tag' must be provided."
20302055
)
20312056

20322057
# Normalize timestamp
@@ -2061,6 +2086,7 @@ def validate_and_normalize_params(
20612086
timestamp_type=timestamp_type,
20622087
stream=stream,
20632088
version=version,
2089+
version_tag=version_tag,
20642090
)
20652091

20662092
def generate_sql_clause(self) -> str:
@@ -2069,8 +2095,10 @@ def generate_sql_clause(self) -> str:
20692095
Args:
20702096
config: Time travel configuration.
20712097
Returns:
2072-
SQL clause like " AT (TIMESTAMP => TO_TIMESTAMP_NTZ('...'))" or
2073-
" AT (VERSION => 1234567890)" for Iceberg snapshot id time travel.
2098+
SQL clause like " AT (TIMESTAMP => TO_TIMESTAMP_NTZ('...'))",
2099+
" AT (VERSION => 1234567890)" for Iceberg snapshot id time travel,
2100+
or " AT (VERSION_TAG => 'release_v1')" for Iceberg tag time
2101+
travel.
20742102
"""
20752103
clause = f" {self.time_travel_mode.upper()} "
20762104

@@ -2082,6 +2110,8 @@ def generate_sql_clause(self) -> str:
20822110
clause += f"(STREAM => '{self.stream}')"
20832111
elif self.version is not None:
20842112
clause += f"(VERSION => {self.version})"
2113+
elif self.version_tag is not None:
2114+
clause += f"(VERSION_TAG => '{self.version_tag}')"
20852115
elif self.timestamp is not None:
20862116
if self.timestamp_type is not None:
20872117
timestamp_type = self.timestamp_type.upper()

src/snowflake/snowpark/dataframe_reader.py

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,15 @@ def _extract_time_travel_from_options(options: dict) -> dict:
161161
- Automatically set time_travel_mode to 'at'
162162
(Iceberg snapshot ids only support ``AT(VERSION => N)``, not ``BEFORE``)
163163
- Cannot be used with time_travel_mode='before' (raises error)
164+
165+
Special handling for 'VERSION_TAG' / 'VERSION-TAG' (Iceberg tag name) —
166+
both aliases map to the internal ``version_tag`` time travel parameter
167+
and emit ``AT(VERSION_TAG => '<name>')`` on the Snowflake side (see
168+
Spark Iceberg's ``VERSION AS OF '<tag_name>'`` reader path):
169+
- Automatically set time_travel_mode to 'at'
170+
(tag reads are positional — bound to a specific snapshot — not
171+
range-of-time)
172+
- Cannot be used with time_travel_mode='before' (raises error)
164173
"""
165174
result = {}
166175
excluded_keys = set()
@@ -211,6 +220,27 @@ def _extract_time_travel_from_options(options: dict) -> dict:
211220
)
212221
result["time_travel_mode"] = "at"
213222

223+
# Handle Iceberg tag (``version_tag`` / ``version-tag``). Both aliases
224+
# route to the internal ``version_tag`` parameter and emit
225+
# ``AT(VERSION_TAG => '<name>')`` server-side. Auto-sets mode='at'.
226+
version_tag_value = options.get("VERSION_TAG")
227+
version_tag_source = "version_tag"
228+
if version_tag_value is None:
229+
version_tag_value = options.get("VERSION-TAG")
230+
version_tag_source = "version-tag"
231+
if version_tag_value is not None:
232+
if (
233+
"TIME_TRAVEL_MODE" in options
234+
and options["TIME_TRAVEL_MODE"].lower() == "before"
235+
):
236+
raise ValueError(
237+
f"Cannot use '{version_tag_source}' option with "
238+
"time_travel_mode='before'. Iceberg tag time travel only "
239+
"supports time_travel_mode='at'."
240+
)
241+
result["version_tag"] = str(version_tag_value)
242+
result["time_travel_mode"] = "at"
243+
214244
for option_key, param_name in _TIME_TRAVEL_OPTIONS_PARAMS_MAP.items():
215245
if option_key in options and option_key not in excluded_keys:
216246
result[param_name] = options[option_key]
@@ -634,11 +664,13 @@ def table(
634664
... .option("offset", -60) # This will be IGNORED
635665
... .table("my_table", time_travel_mode="at", offset=-3600)) # Only this is used
636666
"""
637-
# ``version`` (Iceberg snapshot id) is intentionally not in the public
638-
# signature — it's consumed by Snowpark Connect and may be removed
639-
# once a first-class API lands. Accept it through **kwargs so direct
640-
# callers can still pass it without us advertising it.
667+
# ``version`` (Iceberg snapshot id) and ``version_tag`` (Iceberg tag
668+
# name) are intentionally not in the public signature — they are
669+
# consumed by Snowpark Connect and may be removed once a first-class
670+
# API lands. Accept them through **kwargs so direct callers can
671+
# still pass them without us advertising the surface.
641672
version = kwargs.pop("version", None)
673+
version_tag = kwargs.pop("version_tag", None)
642674
if kwargs:
643675
raise TypeError(
644676
f"table() got unexpected keyword arguments: {sorted(kwargs)}"
@@ -664,13 +696,20 @@ def table(
664696
if stream is not None:
665697
ast.stream.value = stream
666698

667-
if time_travel_mode is not None or version is not None:
668-
# If version is provided without mode, default to 'at' (snapshot ids
669-
# only make sense with AT — symmetric with iceberg_tag handling).
699+
if (
700+
time_travel_mode is not None
701+
or version is not None
702+
or version_tag is not None
703+
):
704+
# If version / version_tag is provided without mode, default to
705+
# 'at' — snapshot ids and tag reads only make sense with AT
706+
# (symmetric with the as-of-timestamp option handling).
670707
effective_mode = (
671708
time_travel_mode
672709
if time_travel_mode
673-
else ("at" if version is not None else None)
710+
else (
711+
"at" if (version is not None or version_tag is not None) else None
712+
)
674713
)
675714
time_travel_params = {
676715
"time_travel_mode": effective_mode,
@@ -680,6 +719,7 @@ def table(
680719
"timestamp_type": timestamp_type,
681720
"stream": stream,
682721
"version": version,
722+
"version_tag": version_tag,
683723
}
684724
else:
685725
# if time_travel_mode is not provided, extract time travel config from options

src/snowflake/snowpark/session.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2773,11 +2773,13 @@ def table(
27732773
# timestamp_type remains "NTZ" (user's explicit choice respected)
27742774
>>> table2 = session.read.table("my_table", time_travel_mode="at", timestamp=tz_aware, timestamp_type="NTZ") # doctest: +SKIP
27752775
"""
2776-
# ``version`` (Iceberg snapshot id) is intentionally not in the public
2777-
# signature — it's consumed by Snowpark Connect and may be removed
2778-
# once a first-class API lands. Accept it through **kwargs so direct
2779-
# callers can still pass it without us advertising it.
2776+
# ``version`` (Iceberg snapshot id) and ``version_tag`` (Iceberg tag
2777+
# name) are intentionally not in the public signature — they are
2778+
# consumed by Snowpark Connect and may be removed once a first-class
2779+
# API lands. Accept them through **kwargs so direct callers can
2780+
# still pass them without us advertising the surface.
27802781
version = kwargs.pop("version", None)
2782+
version_tag = kwargs.pop("version_tag", None)
27812783
if kwargs:
27822784
raise TypeError(
27832785
f"table() got unexpected keyword arguments: {sorted(kwargs)}"
@@ -2820,6 +2822,7 @@ def table(
28202822
timestamp_type=timestamp_type,
28212823
stream=stream,
28222824
version=version,
2825+
version_tag=version_tag,
28232826
)
28242827
# Replace API call origin for table
28252828
set_api_call_source(t, "Session.table")

src/snowflake/snowpark/table.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -296,11 +296,13 @@ def __init__(
296296
stream: Optional[str] = None,
297297
**kwargs,
298298
) -> None:
299-
# ``version`` (Iceberg snapshot id) is intentionally not in the public
300-
# signature — it's consumed by Snowpark Connect and may be removed
301-
# once a first-class API lands. Accept it through **kwargs so direct
302-
# callers can still pass it without us advertising it.
299+
# ``version`` (Iceberg snapshot id) and ``version_tag`` (Iceberg tag
300+
# name) are intentionally not in the public signature — they are
301+
# consumed by Snowpark Connect and may be removed once a first-class
302+
# API lands. Accept them through **kwargs so direct callers can
303+
# still pass them without us advertising the surface.
303304
version = kwargs.pop("version", None)
305+
version_tag = kwargs.pop("version_tag", None)
304306
if kwargs:
305307
raise TypeError(
306308
f"Table() got unexpected keyword arguments: {sorted(kwargs)}"
@@ -333,6 +335,7 @@ def __init__(
333335
timestamp_type=timestamp_type,
334336
stream=stream,
335337
version=version,
338+
version_tag=version_tag,
336339
)
337340

338341
snowflake_table_plan = SnowflakeTable(

tests/integ/test_dataframe.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8423,3 +8423,70 @@ def test_iceberg_snapshot_id_time_travel_dataframe_reader_option(session):
84238423
session.read.option("snapshot-id", snapshot_id).table(table_fqn).collect()
84248424
)
84258425
assert via_kwarg == via_option
8426+
8427+
8428+
# ----------------------------------------------------------------------
8429+
# Iceberg tag (``version_tag=``) time travel.
8430+
#
8431+
# TODO(SNOW-3473261): Wire these up to a CI test account that has:
8432+
# * a Catalog-Linked Database (CLD) such as cldUnity / cldglue, AND
8433+
# * an unmanaged Iceberg table inside it that exposes a named tag
8434+
# (e.g. created via ``ALTER TABLE ... CREATE TAG <name>`` on the OSS
8435+
# Iceberg side, or via the catalog's tag API).
8436+
#
8437+
# Snowflake's ``AT(VERSION_TAG => '<name>')`` is the released tag-only
8438+
# Iceberg time-travel clause (Spark Iceberg's
8439+
# ``VERSION AS OF '<tag_name>'`` for tag reads). Like the snapshot-id
8440+
# surface, it currently requires ``FEATURE_ICEBERG_TIME_TRAVEL`` on the
8441+
# account and is scoped to unmanaged Iceberg tables in CLDs, so these
8442+
# tests are skipped by default and exercised manually against
8443+
# ``sfctest0`` (see ``tests/sas_tests/test_iceberg_version_tag_sample.py``
8444+
# in snowflake-eng/sas for the manual reproducer).
8445+
# ----------------------------------------------------------------------
8446+
@pytest.mark.skip(
8447+
reason=(
8448+
"Requires a CLD-linked unmanaged Iceberg table with at least one "
8449+
"named tag and FEATURE_ICEBERG_TIME_TRAVEL enabled on the account. "
8450+
"Tested manually; see TODO above."
8451+
)
8452+
)
8453+
def test_iceberg_version_tag_time_travel_session_table(session):
8454+
"""End-to-end: ``Session.table(..., version_tag='<name>')`` returns the
8455+
table state at the requested Iceberg tag."""
8456+
table_fqn = "CLDUNITY.scosschema.snapshot_demo"
8457+
# Demo table is set up with a tag ``first_load`` pointing at the
8458+
# earliest snapshot (see sas-side reproducer for the setup script).
8459+
tag_name = "first_load"
8460+
8461+
tagged = session.table(
8462+
table_fqn, time_travel_mode="at", version_tag=tag_name
8463+
).collect()
8464+
latest = session.table(table_fqn).collect()
8465+
# Tag reads use the snapshot's schema as it existed at the tag —
8466+
# row count at an earlier tag should be ≤ current row count, since the
8467+
# tag is bound to a specific (earlier) snapshot.
8468+
assert len(tagged) <= len(latest)
8469+
8470+
8471+
@pytest.mark.skip(
8472+
reason=(
8473+
"Requires a CLD-linked unmanaged Iceberg table with at least one "
8474+
"named tag and FEATURE_ICEBERG_TIME_TRAVEL enabled on the account. "
8475+
"Tested manually; see TODO above."
8476+
)
8477+
)
8478+
def test_iceberg_version_tag_time_travel_dataframe_reader_option(session):
8479+
"""End-to-end: ``session.read.option('version_tag', 'name').table(...)``
8480+
routes through the Iceberg-compat option alias and produces the same
8481+
result as the explicit ``version_tag=`` kwarg."""
8482+
table_fqn = "CLDUNITY.scosschema.snapshot_demo"
8483+
tag_name = "first_load"
8484+
8485+
via_kwarg = session.read.table(
8486+
table_fqn, time_travel_mode="at", version_tag=tag_name
8487+
).collect()
8488+
via_option = session.read.option("version_tag", tag_name).table(table_fqn).collect()
8489+
via_hyphen_option = (
8490+
session.read.option("version-tag", tag_name).table(table_fqn).collect()
8491+
)
8492+
assert via_kwarg == via_option == via_hyphen_option

0 commit comments

Comments
 (0)