Skip to content

Commit f258b5e

Browse files
* Refactoring
* Making optional but failing requirements mandatory * Fixing unit tests * Documentation updates Signed-off-by: Mateusz Jukiewicz <mateusz@marketer.tech>
1 parent 9dec6d9 commit f258b5e

8 files changed

Lines changed: 393 additions & 80 deletions

File tree

docs/integrations/engines/starrocks.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ FROM source.user_events;
7979

8080
A `DUPLICATE KEY` table can usually be used as a `FULL` kind model.
8181

82-
### 2) An incremental table (PRIMARY KEY recommended)
82+
### 2) An incremental table (PRIMARY KEY required)
8383

8484
```sql
8585
MODEL (
@@ -129,9 +129,11 @@ MODEL (
129129

130130
### PRIMARY KEY Type
131131

132-
For incremental models, **PRIMARY KEY tables are needed** (and effectively required for robust deletes) because StarRocks supports *weaker* `DELETE ... WHERE ...` on non-primary-key table types.
132+
For incremental models, a **PRIMARY KEY table is mandatory**. StarRocks only supports the full `DELETE ... WHERE ...` and `MERGE` semantics that incremental kinds rely on (such as `INCREMENTAL_BY_TIME_RANGE`, `INCREMENTAL_BY_UNIQUE_KEY`, `INCREMENTAL_BY_PARTITION`, and `SCD_TYPE_2`) on PRIMARY KEY tables. On DUPLICATE KEY, UNIQUE KEY, and AGGREGATE KEY tables these operations are not supported well enough.
133133

134-
SQLMesh will apply conservative `WHERE` transformations for compatibility (for example, converting `BETWEEN` to `>= AND <=`, removing boolean literals, and converting `DELETE ... WHERE TRUE` to `TRUNCATE TABLE`). To avoid limitations and keep incremental maintenance reliable, use a `PRIMARY KEY` table by setting `physical_properties.primary_key`.
134+
SQLMesh enforces this: an incremental model on StarRocks without a primary key fails fast with a clear error. Set `physical_properties.primary_key`, for example `physical_properties (primary_key = (user_id, event_date))`. As a convenience, an `INCREMENTAL_BY_UNIQUE_KEY` model's `unique_key` is automatically promoted to a PRIMARY KEY table.
135+
136+
SQLMesh engine also applies conservative `WHERE` transformations for compatibility (for example, converting `BETWEEN` to `>= AND <=`, removing boolean literals, and converting `DELETE ... WHERE TRUE` to `TRUNCATE TABLE`).
135137

136138
> SQLMesh currently does not support specifying `primary_key` as a model parameter.
137139
@@ -160,7 +162,7 @@ GROUP BY user_id, event_date;
160162

161163
### UNIQUE KEY Type
162164

163-
You can create a UNIQUE KEY table by setting `physical_properties.unique_key`. In most incremental use cases, a PRIMARY KEY table is recommended instead.
165+
You can create a UNIQUE KEY table by setting `physical_properties.unique_key`. Note that a UNIQUE KEY table is **not** sufficient for incremental models — incremental kinds require a PRIMARY KEY table (see [PRIMARY KEY Type](#primary-key-type)).
164166

165167
**Example:**
166168

sqlmesh/core/engine_adapter/base.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2728,6 +2728,35 @@ def _build_clustered_by_exp(
27282728
) -> t.Optional[exp.Cluster]:
27292729
return None
27302730

2731+
def adjust_physical_properties_for_incremental(
2732+
self,
2733+
physical_properties: t.Dict[str, t.Any],
2734+
*,
2735+
requires_delete_capable_table: bool,
2736+
unique_key: t.Optional[t.List[exp.Expr]],
2737+
model_name: str,
2738+
) -> t.Dict[str, t.Any]:
2739+
"""Adjusts physical properties for an incremental model before the table is created.
2740+
2741+
Some engines require a specific physical table layout before they can run the DELETE/MERGE
2742+
statements that incremental model kinds rely on (e.g. StarRocks only supports those on
2743+
PRIMARY KEY tables). This hook lets each engine derive or validate the required properties
2744+
while keeping the generic evaluator free of engine-specific branching.
2745+
2746+
Args:
2747+
physical_properties: The model's physical properties.
2748+
requires_delete_capable_table: Whether the model kind issues DELETE/MERGE statements
2749+
(as opposed to append-only INSERTs), as determined by the generic evaluator.
2750+
unique_key: The model's unique key, populated only when the kind allows promoting it to
2751+
an engine-specific key (i.e. INCREMENTAL_BY_UNIQUE_KEY); otherwise None.
2752+
model_name: The model name, for use in diagnostics.
2753+
2754+
Returns:
2755+
The (possibly adjusted) physical properties. Implementations own the given mapping and
2756+
may mutate it in place; the base implementation returns it unchanged.
2757+
"""
2758+
return physical_properties
2759+
27312760
def _build_table_properties_exp(
27322761
self,
27332762
catalog_name: t.Optional[str] = None,

sqlmesh/core/engine_adapter/starrocks.py

Lines changed: 66 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1672,8 +1672,11 @@ class StarRocksEngineAdapter(
16721672
TODO: later, we can add support for INSERT OVERWRITE, even use Primary Key for beter performance
16731673
"""
16741674

1675-
COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_CTAS
1676-
"""Table comments are added in both CREATE TABLE statement and CTAS"""
1675+
COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS
1676+
"""Column comments are added inline in a plain CREATE TABLE, but StarRocks CTAS only accepts a
1677+
bare column-name list (no types or per-column COMMENT) before AS SELECT. So for CTAS we emit
1678+
`CREATE TABLE t COMMENT '...' AS SELECT ...` (table comment only) and register column comments
1679+
afterward via ALTER TABLE ... MODIFY COLUMN ... COMMENT (see _build_create_comment_column_exp)."""
16771680

16781681
COMMENT_CREATION_VIEW = CommentCreationView.IN_SCHEMA_DEF_NO_COMMANDS
16791682
"""View comments are added in CREATE VIEW statement"""
@@ -2105,6 +2108,48 @@ def execute(
21052108
**kwargs,
21062109
)
21072110

2111+
def adjust_physical_properties_for_incremental(
2112+
self,
2113+
physical_properties: t.Dict[str, t.Any],
2114+
*,
2115+
requires_delete_capable_table: bool,
2116+
unique_key: t.Optional[t.List[exp.Expr]],
2117+
model_name: str,
2118+
) -> t.Dict[str, t.Any]:
2119+
"""Enforce that StarRocks incremental models use a PRIMARY KEY table.
2120+
2121+
Incremental kinds rely on DELETE/MERGE statements that StarRocks only supports on PRIMARY
2122+
KEY tables; DUPLICATE/UNIQUE/AGGREGATE KEY tables reject the predicates SQLMesh generates
2123+
(e.g. a time-range DELETE with a CAST bound, or any non-key-column predicate). When a
2124+
unique_key is available (INCREMENTAL_BY_UNIQUE_KEY) we promote it to a PRIMARY KEY;
2125+
otherwise a PRIMARY KEY must be specified explicitly via physical_properties, and we raise
2126+
so the failure is clear at creation time rather than producing a broken table.
2127+
2128+
The caller owns ``physical_properties`` (it is already a defensive copy), so we mutate and
2129+
return it in place.
2130+
"""
2131+
if not requires_delete_capable_table or "primary_key" in physical_properties:
2132+
return physical_properties
2133+
2134+
# Promote the model's unique_key to a PRIMARY KEY table so that complex DELETE/MERGE
2135+
# statements remain supported.
2136+
if unique_key:
2137+
physical_properties["primary_key"] = (
2138+
unique_key[0] if len(unique_key) == 1 else exp.Tuple(expressions=unique_key)
2139+
)
2140+
logger.info(
2141+
"Model '%s' promoted to PRIMARY KEY table on StarRocks to support rich DELETE operations.",
2142+
model_name,
2143+
)
2144+
return physical_properties
2145+
2146+
raise SQLMeshError(
2147+
f"StarRocks incremental model '{model_name}' requires a PRIMARY KEY table. "
2148+
"Incremental kinds use DELETE/MERGE operations that StarRocks only supports on PRIMARY KEY "
2149+
"tables; DUPLICATE/UNIQUE/AGGREGATE KEY tables are not sufficient. "
2150+
"Specify `physical_properties (primary_key = (...))`, or set `unique_key` on the model."
2151+
)
2152+
21082153
# ==================== Table Creation (CORE IMPLEMENTATION) ====================
21092154
def _create_table_from_columns(
21102155
self,
@@ -2546,10 +2591,18 @@ def _build_table_properties_exp(
25462591
properties.append(distributed_prop)
25472592

25482593
# 5. Handle refresh_property (REFRESH ...)
2594+
# StarRocks only supports ASYNC materialized views, which require a REFRESH clause.
2595+
# Synchronous MVs are not supported, so a missing refresh is a hard error rather than
2596+
# a silent fallback (which would create an undetectable sync MV).
25492597
if is_mv:
25502598
refresh_prop = self._build_refresh_property(table_properties_copy)
2551-
if refresh_prop:
2552-
properties.append(refresh_prop)
2599+
if refresh_prop is None:
2600+
raise SQLMeshError(
2601+
"StarRocks materialized views require a REFRESH clause. "
2602+
"Specify at least one of 'refresh_moment' or 'refresh_scheme' in the model's "
2603+
"physical_properties (e.g. refresh_scheme = 'ASYNC')."
2604+
)
2605+
properties.append(refresh_prop)
25532606

25542607
# 6. Handle order_by/clustered_by (ORDER BY ...)
25552608
order_prop = self._build_order_by_property(table_properties_copy, clustered_by or None)
@@ -3314,9 +3367,9 @@ def _build_create_comment_table_exp(
33143367
StarRocks uses non-standard syntax for table comments:
33153368
ALTER TABLE {table} COMMENT = '{comment}'
33163369
3317-
Note: This method is typically NOT called for StarRocks because:
3318-
- COMMENT_CREATION_TABLE = IN_SCHEMA_DEF_CTAS
3319-
- Comments are included directly in CREATE TABLE via SchemaCommentProperty
3370+
Note: This method is typically NOT called for StarRocks because the table comment is
3371+
included directly in CREATE TABLE (and CTAS) via SchemaCommentProperty, which StarRocks
3372+
accepts even for `CREATE TABLE ... COMMENT '...' AS SELECT`.
33203373
33213374
However, this override is provided for potential future use cases:
33223375
- Modifying comments on existing tables via ALTER TABLE
@@ -3346,15 +3399,13 @@ def _build_create_comment_column_exp(
33463399
"""
33473400
Build ALTER TABLE MODIFY COLUMN SQL for column comment modification.
33483401
3349-
StarRocks requires column type in MODIFY COLUMN statement:
3350-
ALTER TABLE {table} MODIFY COLUMN {column} {type} COMMENT '{comment}'
3351-
3352-
Note: This method is typically NOT called for StarRocks because:
3353-
- COMMENT_CREATION_TABLE = IN_SCHEMA_DEF_CTAS
3354-
- Column comments are included directly in CREATE TABLE DDL
3402+
StarRocks accepts the comment without re-stating the column type:
3403+
ALTER TABLE {table} MODIFY COLUMN {column} COMMENT '{comment}'
33553404
3356-
However, this override is provided for potential future use cases:
3357-
- Modifying column comments on existing tables via ALTER TABLE
3405+
Because COMMENT_CREATION_TABLE = IN_SCHEMA_DEF_NO_CTAS, column comments are inlined for a
3406+
plain CREATE TABLE but NOT for CTAS (StarRocks rejects types/comments in a CTAS column
3407+
list). This method is therefore the fallback used to register column comments after a CTAS,
3408+
and to modify column comments on existing tables.
33583409
33593410
Args:
33603411
table: Table expression

sqlmesh/core/snapshot/evaluator.py

Lines changed: 41 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2045,38 +2045,36 @@ def run_post_statements(self, snapshot: Snapshot, render_kwargs: t.Any) -> None:
20452045
self.adapter.execute(snapshot.model.render_post_statements(**render_kwargs))
20462046

20472047

2048-
def _ensure_primary_key_for_starrocks_when_incremental_by_unique_key(
2049-
model: Model, physical_properties: t.Optional[t.Dict[str, t.Any]]
2048+
def _adjust_physical_properties_for_engine(
2049+
adapter: EngineAdapter,
2050+
model: Model,
2051+
physical_properties: t.Optional[t.Dict[str, t.Any]],
20502052
) -> t.Dict[str, t.Any]:
2051-
"""
2052-
Promote StarRocks incremental-by-unique-key models to PRIMARY KEY tables so that
2053-
complex DELETE/MERGE statements remain supported.
2054-
"""
2053+
"""Let the target engine adjust/validate physical properties for an incremental model.
20552054
2056-
properties = dict(physical_properties or {})
2057-
2058-
if (
2059-
model.dialect != "starrocks"
2060-
or not model.kind.is_incremental_by_unique_key
2061-
or "primary_key" in properties
2062-
):
2063-
return properties
2064-
unique_key: t.Optional[t.List[exp.Expr]] = model.unique_key
2065-
if unique_key:
2066-
properties["primary_key"] = (
2067-
unique_key[0] if len(unique_key) == 1 else exp.Tuple(expressions=unique_key)
2068-
)
2069-
logger.info(
2070-
"Model '%s' promoted to PRIMARY KEY table on StarRocks to support rich DELETE operations.",
2071-
model.name,
2072-
)
2073-
else:
2074-
logger.warning(
2075-
f"StarRocks incremental-by-unique-key model '{model.name}' requires a PRIMARY KEY table. "
2076-
f"Specify `physical_properties['primary_key']` or set `unique_key` on the model.",
2077-
)
2055+
The generic responsibility here is to determine, from the model kind, whether the table will
2056+
be the target of DELETE/MERGE statements (vs. append-only INSERTs) and whether its unique_key
2057+
may be promoted to an engine-specific key. The engine adapter decides what, if anything, to do
2058+
with that information (see ``EngineAdapter.adjust_physical_properties_for_incremental``).
2059+
"""
2060+
kind = model.kind
2061+
2062+
# Only incremental kinds that issue DELETE/MERGE need a delete-capable table. Append-only
2063+
# INCREMENTAL_UNMANAGED (insert_overwrite=False) only does INSERT, so it does not.
2064+
requires_delete_capable_table = (
2065+
kind.is_incremental_by_time_range
2066+
or kind.is_incremental_by_unique_key
2067+
or kind.is_incremental_by_partition
2068+
or kind.is_scd_type_2
2069+
or (isinstance(kind, IncrementalUnmanagedKind) and kind.insert_overwrite)
2070+
)
20782071

2079-
return properties
2072+
return adapter.adjust_physical_properties_for_incremental(
2073+
dict(physical_properties or {}),
2074+
requires_delete_capable_table=requires_delete_capable_table,
2075+
unique_key=model.unique_key if kind.is_incremental_by_unique_key else None,
2076+
model_name=model.name,
2077+
)
20802078

20812079

20822080
class MaterializableStrategy(PromotableStrategy, abc.ABC):
@@ -2090,9 +2088,8 @@ def create(
20902088
**kwargs: t.Any,
20912089
) -> None:
20922090
ctas_query = model.ctas_query(**render_kwargs)
2093-
physical_properties = kwargs.get("physical_properties", model.physical_properties)
2094-
physical_properties = _ensure_primary_key_for_starrocks_when_incremental_by_unique_key(
2095-
model, physical_properties
2091+
physical_properties = _adjust_physical_properties_for_engine(
2092+
self.adapter, model, kwargs.get("physical_properties", model.physical_properties)
20962093
)
20972094

20982095
logger.info("Creating table '%s'", table_name)
@@ -2208,9 +2205,8 @@ def _replace_query_for_model(
22082205
except Exception:
22092206
columns_to_types, source_columns = None, None
22102207

2211-
physical_properties = kwargs.get("physical_properties", model.physical_properties)
2212-
physical_properties = _ensure_primary_key_for_starrocks_when_incremental_by_unique_key(
2213-
model, physical_properties
2208+
physical_properties = _adjust_physical_properties_for_engine(
2209+
self.adapter, model, kwargs.get("physical_properties", model.physical_properties)
22142210
)
22152211
self.adapter.replace_query(
22162212
name,
@@ -2354,9 +2350,8 @@ def insert(
23542350
table_name,
23552351
render_kwargs=render_kwargs,
23562352
)
2357-
physical_properties = kwargs.get("physical_properties", model.physical_properties)
2358-
physical_properties = _ensure_primary_key_for_starrocks_when_incremental_by_unique_key(
2359-
model, physical_properties
2353+
physical_properties = _adjust_physical_properties_for_engine(
2354+
self.adapter, model, kwargs.get("physical_properties", model.physical_properties)
23602355
)
23612356
self.adapter.merge(
23622357
table_name,
@@ -2384,9 +2379,8 @@ def append(
23842379
columns_to_types, source_columns = self._get_target_and_source_columns(
23852380
model, table_name, render_kwargs=render_kwargs
23862381
)
2387-
physical_properties = kwargs.get("physical_properties", model.physical_properties)
2388-
physical_properties = _ensure_primary_key_for_starrocks_when_incremental_by_unique_key(
2389-
model, physical_properties
2382+
physical_properties = _adjust_physical_properties_for_engine(
2383+
self.adapter, model, kwargs.get("physical_properties", model.physical_properties)
23902384
)
23912385
self.adapter.merge(
23922386
table_name,
@@ -2594,6 +2588,9 @@ def create(
25942588
columns_to_types = model.columns_to_types_or_raise
25952589
if isinstance(model.kind, SCDType2ByTimeKind):
25962590
columns_to_types[model.kind.updated_at_name.name] = model.kind.time_data_type
2591+
physical_properties = _adjust_physical_properties_for_engine(
2592+
self.adapter, model, kwargs.get("physical_properties", model.physical_properties)
2593+
)
25972594
self.adapter.create_table(
25982595
table_name,
25992596
target_columns_to_types=columns_to_types,
@@ -2602,7 +2599,7 @@ def create(
26022599
partitioned_by=model.partitioned_by,
26032600
partition_interval_unit=model.partition_interval_unit,
26042601
clustered_by=model.clustered_by,
2605-
table_properties=kwargs.get("physical_properties", model.physical_properties),
2602+
table_properties=physical_properties,
26062603
table_description=model.description if is_table_deployable else None,
26072604
column_descriptions=model.column_descriptions if is_table_deployable else None,
26082605
)
@@ -3179,9 +3176,8 @@ def create(
31793176
if is_table_deployable and is_snapshot_deployable:
31803177
# We could deploy this to prod; create a proper managed table
31813178
logger.info("Creating managed table: %s", table_name)
3182-
physical_properties = kwargs.get("physical_properties", model.physical_properties)
3183-
physical_properties = _ensure_primary_key_for_starrocks_when_incremental_by_unique_key(
3184-
model, physical_properties
3179+
physical_properties = _adjust_physical_properties_for_engine(
3180+
self.adapter, model, kwargs.get("physical_properties", model.physical_properties)
31853181
)
31863182
self.adapter.create_managed_table(
31873183
table_name=table_name,

0 commit comments

Comments
 (0)