Skip to content

Commit fb5c52f

Browse files
authored
Feat: Enable Iceberg support for Snowflake (#4262)
1 parent 99e9629 commit fb5c52f

7 files changed

Lines changed: 199 additions & 7 deletions

File tree

.circleci/config.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ on_tag_filter: &on_tag_filter
1717
only: /^v.+/
1818

1919
orbs:
20-
path-filtering: circleci/path-filtering@1.1.0
20+
path-filtering: circleci/path-filtering@1.2.0
2121

2222
jobs:
2323
vscode-extension-setup:
@@ -116,6 +116,7 @@ workflows:
116116
pytest.ini|setup.cfg|setup.py python true
117117
\.circleci/.*|Makefile|\.pre-commit-config\.yaml common true
118118
vscode/extensions/.* vscode true
119+
tag: "3.9"
119120

120121
- vscode-extension-setup:
121122
<<: *on_main_or_tag_filter

docs/integrations/engines/snowflake.md

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ MODEL (
525525
),
526526
);
527527
```
528-
528+
529529
## Custom View and Table types
530530

531531
SQLMesh supports custom view and table types for Snowflake models. You can apply these modifiers to either the physical layer or virtual layer of a model using the `physical_properties` and `virtual_properties` attributes respectively. For example:
@@ -535,8 +535,8 @@ SQLMesh supports custom view and table types for Snowflake models. You can apply
535535
A table can be exposed through a `SECURE` view in the virtual layer by specifying the `creatable_type` property and setting it to `SECURE`:
536536

537537
```sql linenums="1"
538-
Model (
539-
name = schema_name.model_name,
538+
MODEL (
539+
name schema_name.model_name,
540540
virtual_properties (
541541
creatable_type = SECURE
542542
)
@@ -550,8 +550,8 @@ SELECT a FROM schema_name.model_b;
550550
A model can use a `TRANSIENT` table in the physical layer by specifying the `creatable_type` property and setting it to `TRANSIENT`:
551551

552552
```sql linenums="1"
553-
Model (
554-
name = schema_name.model_name,
553+
MODEL (
554+
name schema_name.model_name,
555555
physical_properties (
556556
creatable_type = TRANSIENT
557557
)
@@ -560,6 +560,52 @@ Model (
560560
SELECT a FROM schema_name.model_b;
561561
```
562562

563+
### Iceberg Tables
564+
565+
In order for Snowflake to be able to create an Iceberg table, there must be an [External Volume](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume) configured to store the Iceberg table data on.
566+
567+
Once that is configured, you can create a model backed by an Iceberg table by using `table_format iceberg` like so:
568+
569+
```sql linenums="1" hl_lines="4 6-7"
570+
MODEL (
571+
name schema_name.model_name,
572+
kind FULL,
573+
table_format iceberg,
574+
physical_properties (
575+
catalog = 'snowflake',
576+
external_volume = '<external volume name>'
577+
)
578+
);
579+
```
580+
581+
To prevent having to specify `catalog = 'snowflake'` and `external_volume = '<external volume name>'` on every model, see the Snowflake documentation for:
582+
583+
- [Configuring a default Catalog](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-catalog-integration#set-a-default-catalog-at-the-account-database-or-schema-level)
584+
- [Configuring a default External Volume](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume#set-a-default-external-volume-at-the-account-database-or-schema-level)
585+
586+
Alternatively you can also use [model defaults](../../guides/configuration.md#model-defaults) to set defaults at the SQLMesh level instead.
587+
588+
To utilize the wide variety of [optional properties](https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table-snowflake#optional-parameters) that Snowflake makes available for Iceberg tables, simply specify them as `physical_properties`:
589+
590+
```sql linenums="1" hl_lines="8"
591+
MODEL (
592+
name schema_name.model_name,
593+
kind FULL,
594+
table_format iceberg,
595+
physical_properties (
596+
catalog = 'snowflake',
597+
external_volume = 'my_external_volume',
598+
base_location = 'my/product_reviews/'
599+
)
600+
);
601+
```
602+
603+
!!! warning "External catalogs"
604+
605+
Setting `catalog = 'snowflake'` to use Snowflake's internal catalog is a good default because SQLMesh needs to be able to write to the tables it's managing and Snowflake [does not support](https://docs.snowflake.com/en/user-guide/tables-iceberg#catalog-options) writing to Iceberg tables configured under external catalogs.
606+
607+
You can however still reference a table from an external catalog in your model as a normal [external table](../../concepts/models/external_models.md).
608+
563609
## Troubleshooting
564610

565611
### Frequent Authentication Prompts

sqlmesh/core/engine_adapter/snowflake.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,38 @@ def snowpark(self) -> t.Optional[SnowparkSession]:
112112
def catalog_support(self) -> CatalogSupport:
113113
return CatalogSupport.FULL_SUPPORT
114114

115+
def _create_table(
116+
self,
117+
table_name_or_schema: t.Union[exp.Schema, TableName],
118+
expression: t.Optional[exp.Expression],
119+
exists: bool = True,
120+
replace: bool = False,
121+
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
122+
table_description: t.Optional[str] = None,
123+
column_descriptions: t.Optional[t.Dict[str, str]] = None,
124+
table_kind: t.Optional[str] = None,
125+
**kwargs: t.Any,
126+
) -> None:
127+
table_format = kwargs.get("table_format")
128+
if table_format and isinstance(table_format, str):
129+
table_format = table_format.upper()
130+
if not table_kind:
131+
table_kind = f"{table_format} TABLE"
132+
elif table_kind == self.MANAGED_TABLE_KIND:
133+
table_kind = f"DYNAMIC {table_format} TABLE"
134+
135+
super()._create_table(
136+
table_name_or_schema=table_name_or_schema,
137+
expression=expression,
138+
exists=exists,
139+
replace=replace,
140+
columns_to_types=columns_to_types,
141+
table_description=table_description,
142+
column_descriptions=column_descriptions,
143+
table_kind=table_kind,
144+
**kwargs,
145+
)
146+
115147
def create_managed_table(
116148
self,
117149
table_name: TableName,
@@ -230,7 +262,8 @@ def _build_table_properties_exp(
230262
if table_properties:
231263
table_properties = {k.upper(): v for k, v in table_properties.items()}
232264
# if we are creating a non-dynamic table; remove any properties that are only valid for dynamic tables
233-
if table_kind != self.MANAGED_TABLE_KIND:
265+
# this is necessary because we create "normal" tables from the same managed model definition for dev previews and the "normal" tables dont support these parameters
266+
if "DYNAMIC" not in (table_kind or "").upper():
234267
for prop in {"WAREHOUSE", "TARGET_LAG", "REFRESH_MODE", "INITIALIZE"}:
235268
table_properties.pop(prop, None)
236269

sqlmesh/core/snapshot/evaluator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2098,6 +2098,7 @@ def create(
20982098
table_properties=kwargs.get("physical_properties", model.physical_properties),
20992099
table_description=model.description,
21002100
column_descriptions=model.column_descriptions,
2101+
table_format=model.table_format,
21012102
)
21022103
elif not is_table_deployable:
21032104
# Only create the dev preview table as a normal table.
@@ -2134,6 +2135,7 @@ def insert(
21342135
table_properties=kwargs.get("physical_properties", model.physical_properties),
21352136
table_description=model.description,
21362137
column_descriptions=model.column_descriptions,
2138+
table_format=model.table_format,
21372139
)
21382140
elif not is_snapshot_deployable:
21392141
# Snapshot isnt deployable; update the preview table instead

tests/core/engine_adapter/integration/test_integration_snowflake.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,3 +163,50 @@ def _get_data_object(table: exp.Table) -> DataObject:
163163

164164
metadata = _get_data_object(target_table_1)
165165
assert not metadata.is_clustered
166+
167+
168+
def test_create_iceberg_table(ctx: TestContext, engine_adapter: SnowflakeEngineAdapter) -> None:
169+
# Note: this test relies on a default Catalog and External Volume being configured in Snowflake
170+
# ref: https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-catalog-integration#set-a-default-catalog-at-the-account-database-or-schema-level
171+
# ref: https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume#set-a-default-external-volume-at-the-account-database-or-schema-level
172+
# This has been done on the Snowflake account used by CI
173+
174+
model_name = ctx.table("TEST")
175+
managed_model_name = ctx.table("TEST_DYNAMIC")
176+
sqlmesh = ctx.create_context()
177+
178+
model = load_sql_based_model(
179+
d.parse(f"""
180+
MODEL (
181+
name {model_name},
182+
kind FULL,
183+
table_format iceberg,
184+
dialect 'snowflake'
185+
);
186+
187+
select 1 as "ID", 'foo' as "NAME";
188+
""")
189+
)
190+
191+
managed_model = load_sql_based_model(
192+
d.parse(f"""
193+
MODEL (
194+
name {managed_model_name},
195+
kind MANAGED,
196+
physical_properties (
197+
target_lag = '20 minutes'
198+
),
199+
table_format iceberg,
200+
dialect 'snowflake'
201+
);
202+
203+
select "ID", "NAME" from {model_name};
204+
""")
205+
)
206+
207+
sqlmesh.upsert_model(model)
208+
sqlmesh.upsert_model(managed_model)
209+
210+
result = sqlmesh.plan(auto_apply=True)
211+
212+
assert len(result.new_snapshots) == 2

tests/core/engine_adapter/test_snowflake.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@
1919
pytestmark = [pytest.mark.engine, pytest.mark.snowflake]
2020

2121

22+
@pytest.fixture
23+
def snowflake_mocked_engine_adapter(
24+
make_mocked_engine_adapter: t.Callable,
25+
) -> SnowflakeEngineAdapter:
26+
return make_mocked_engine_adapter(SnowflakeEngineAdapter)
27+
28+
2229
def test_get_temp_table(mocker: MockerFixture, make_mocked_engine_adapter: t.Callable):
2330
adapter = make_mocked_engine_adapter(SnowflakeEngineAdapter)
2431

@@ -270,11 +277,25 @@ def test_create_managed_table(make_mocked_engine_adapter: t.Callable, mocker: Mo
270277
},
271278
)
272279

280+
# table_format=iceberg
281+
adapter.create_managed_table(
282+
table_name="test_table",
283+
query=query,
284+
columns_to_types=columns_to_types,
285+
table_properties={
286+
"target_lag": exp.Literal.string("20 minutes"),
287+
"catalog": exp.Literal.string("snowflake"),
288+
"external_volume": exp.Literal.string("test"),
289+
},
290+
table_format="iceberg",
291+
)
292+
273293
assert to_sql_calls(adapter) == [
274294
"""CREATE OR REPLACE DYNAMIC TABLE "test_table" TARGET_LAG='20 minutes' WAREHOUSE="default_warehouse" AS SELECT CAST("a" AS INT) AS "a", CAST("b" AS INT) AS "b" FROM (SELECT "a", "b" FROM "source_table") AS "_subquery\"""",
275295
"""CREATE OR REPLACE DYNAMIC TABLE "test_table" TARGET_LAG='20 minutes' WAREHOUSE="foo" AS SELECT CAST("a" AS INT) AS "a", CAST("b" AS INT) AS "b" FROM (SELECT "a", "b" FROM "source_table") AS "_subquery\"""",
276296
"""CREATE OR REPLACE DYNAMIC TABLE "test_table" CLUSTER BY ("a") TARGET_LAG='20 minutes' WAREHOUSE="default_warehouse" AS SELECT CAST("a" AS INT) AS "a", CAST("b" AS INT) AS "b" FROM (SELECT "a", "b" FROM "source_table") AS "_subquery\"""",
277297
"""CREATE OR REPLACE DYNAMIC TABLE "test_table" TARGET_LAG='20 minutes' REFRESH_MODE='auto' INITIALIZE='on_create' WAREHOUSE="default_warehouse" AS SELECT CAST("a" AS INT) AS "a", CAST("b" AS INT) AS "b" FROM (SELECT "a", "b" FROM "source_table") AS "_subquery\"""",
298+
"""CREATE OR REPLACE DYNAMIC ICEBERG TABLE "test_table" TARGET_LAG='20 minutes' CATALOG='snowflake' EXTERNAL_VOLUME='test' WAREHOUSE="default_warehouse" AS SELECT CAST("a" AS INT) AS "a", CAST("b" AS INT) AS "b" FROM (SELECT "a", "b" FROM "source_table") AS "_subquery\"""",
278299
]
279300

280301

@@ -666,3 +687,44 @@ def test_clone_table(mocker: MockerFixture, make_mocked_engine_adapter: t.Callab
666687
adapter.cursor.execute.assert_called_once_with(
667688
'CREATE TABLE "target_table" CLONE "source_table"'
668689
)
690+
691+
692+
def test_table_format_iceberg(snowflake_mocked_engine_adapter: SnowflakeEngineAdapter) -> None:
693+
adapter = snowflake_mocked_engine_adapter
694+
695+
model = load_sql_based_model(
696+
expressions=d.parse("""
697+
MODEL (
698+
name test.table,
699+
kind full,
700+
table_format iceberg,
701+
physical_properties (
702+
catalog = 'snowflake',
703+
external_volume = 'test'
704+
)
705+
);
706+
SELECT a::INT;
707+
""")
708+
)
709+
assert isinstance(model, SqlModel)
710+
assert model.table_format == "iceberg"
711+
712+
adapter.create_table(
713+
table_name=model.name,
714+
columns_to_types=model.columns_to_types_or_raise,
715+
table_format=model.table_format,
716+
table_properties=model.physical_properties,
717+
)
718+
719+
adapter.ctas(
720+
table_name=model.name,
721+
query_or_df=model.render_query_or_raise(),
722+
columns_to_types=model.columns_to_types_or_raise,
723+
table_format=model.table_format,
724+
table_properties=model.physical_properties,
725+
)
726+
727+
assert to_sql_calls(adapter) == [
728+
'CREATE ICEBERG TABLE IF NOT EXISTS "test"."table" ("a" INT) CATALOG=\'snowflake\' EXTERNAL_VOLUME=\'test\'',
729+
'CREATE ICEBERG TABLE IF NOT EXISTS "test"."table" CATALOG=\'snowflake\' EXTERNAL_VOLUME=\'test\' AS SELECT CAST("a" AS INT) AS "a" FROM (SELECT CAST("a" AS INT) AS "a") AS "_subquery"',
730+
]

tests/core/test_snapshot_evaluator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3367,6 +3367,7 @@ def test_create_managed(adapter_mock, make_snapshot, mocker: MockerFixture):
33673367
table_properties=model.physical_properties,
33683368
table_description=model.description,
33693369
column_descriptions=model.column_descriptions,
3370+
table_format=None,
33703371
)
33713372

33723373

0 commit comments

Comments
 (0)