Skip to content

Commit 10378c9

Browse files
Docs update + StarRocks AMV with audits gets refreshed immediately
Signed-off-by: Mateusz Jukiewicz <mateusz@marketer.tech>
1 parent 33ed712 commit 10378c9

5 files changed

Lines changed: 250 additions & 5 deletions

File tree

docs/integrations/engines/starrocks.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,31 @@ FROM user_events
485485
GROUP BY user_id;
486486
```
487487

488+
**Audits on materialized views:**
489+
490+
Audits require data to exist in the materialized view when they run. Because StarRocks refreshes async MVs as background jobs, the data is not guaranteed to be present immediately after the MV is created. To make audits deterministic, when a materialized view has audits SQLMesh issues a synchronous `REFRESH MATERIALIZED VIEW <name> WITH SYNC MODE` right after creating the MV, which blocks until the data is materialized.
491+
492+
For this to work safely, a materialized view with audits **must** set `refresh_moment = 'DEFERRED'`. This prevents StarRocks' automatic (IMMEDIATE) refresh from racing with the synchronous refresh that SQLMesh issues. If the MV has audits and `refresh_moment` is `IMMEDIATE` (or unset, which defaults to `IMMEDIATE` in StarRocks), SQLMesh raises an error before creating the MV.
493+
494+
```sql
495+
MODEL (
496+
name user_summary_mv,
497+
kind VIEW (
498+
materialized true
499+
),
500+
audits (
501+
not_null(columns := (user_id))
502+
),
503+
physical_properties (
504+
-- required when the MV has audits
505+
refresh_moment = DEFERRED,
506+
refresh_scheme = 'ASYNC'
507+
)
508+
);
509+
510+
SELECT user_id, COUNT(*) AS event_count FROM user_events GROUP BY user_id;
511+
```
512+
488513
**Other properties:**
489514

490515
You can specify `partitioning`, `distribution`, `order by` and `properties` the same as normal table properties. But notice that only supported MV properties are useful, Refer to StarRocks' doc for MV creation.
@@ -512,6 +537,9 @@ target_columns_to_types = {
512537
## Limitations
513538

514539
* **No SYNC MV support**: synchronous materialized views are not supported yet.
540+
* **`FULL` models are not replaced atomically**: StarRocks does not support `CREATE OR REPLACE TABLE` and has no multi-statement transactions (in version 3.5 and lower), so SQLMesh refreshes a `FULL` model by emptying the existing table (a `TRUNCATE`, or a `DELETE` when a filter applies) and then inserting the new result set as separate, auto-committed statements. There is a brief window between the truncate/delete and the completion of the insert during which the table is empty or partially populated, so readers querying it during that window may see missing or incomplete data. Incremental kinds (e.g. `INCREMENTAL_BY_TIME_RANGE`, `INCREMENTAL_BY_PARTITION`) do not fully eliminate this — StarRocks applies them as the same non-atomic delete-then-insert — but they narrow the affected rows to the partition/time range being processed rather than emptying the whole table, so unaffected partitions remain readable throughout. SQLMesh has no way to make these replacements atomic on StarRocks 3.5 and lower.
541+
542+
Future work: this PR targeted StarRocks 3.5, but StarRocks has since expanded its capabilities considerably (the integration now runs against 4.1). Later work should investigate using `INSERT OVERWRITE` together with the transactional/atomic-swap guarantees available in newer StarRocks versions to close this gap (see the `INSERT_OVERWRITE_STRATEGY` and `SUPPORTS_TRANSACTIONS` flags in the StarRocks engine adapter).
515543
* **No tuple IN**: StarRocks does not support `(c1, c2) IN ((v1, v2), ...)`.
516544
* **No `SELECT ... FOR UPDATE`**: StarRocks is an OLAP database and does not support row locks; SQLMesh removes `FOR UPDATE` when executing SQLGlot expressions.
517545
* **RENAME caveat**: `ALTER TABLE db.old RENAME db.new` is not supported; the `RENAME` target cannot be qualified with a database name.

sqlmesh/core/engine_adapter/starrocks.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2310,6 +2310,13 @@ def create_view(
23102310
)
23112311

23122312
# MATERIALIZED VIEW path
2313+
# MVs with audits get a synchronous refresh after creation (see _create_materialized_view),
2314+
# which requires REFRESH DEFERRED. Validate before the drop so we fail without destroying
2315+
# an existing MV.
2316+
has_audits = bool((materialized_properties or {}).get("has_audits"))
2317+
if has_audits:
2318+
self._validate_deferred_refresh_for_audits(view_name, view_properties)
2319+
23132320
if replace:
23142321
# Avoid DROP MATERIALIZED VIEW failure when an object with the same name exists but is not an MV.
23152322
self.drop_data_object_on_type_mismatch(
@@ -2431,6 +2438,16 @@ def _create_materialized_view(
24312438
quote_identifiers=self.QUOTE_IDENTIFIERS_IN_VIEWS,
24322439
)
24332440

2441+
# MVs with audits are created with REFRESH DEFERRED (enforced in create_view), so StarRocks
2442+
# does not populate them on creation. Audits need data, so block on a synchronous refresh.
2443+
if bool((materialized_properties or {}).get("has_audits")):
2444+
refresh_sql = (
2445+
f"REFRESH MATERIALIZED VIEW "
2446+
f"{exp.to_table(view_name).sql(dialect=self.dialect, identify=True)} "
2447+
f"WITH SYNC MODE"
2448+
)
2449+
self.execute(refresh_sql)
2450+
24342451
self._clear_data_object_cache(view_name)
24352452

24362453
def _build_materialized_view_schema_exp(
@@ -3020,6 +3037,37 @@ def _build_distributed_by_property(
30203037
)
30213038
return result
30223039

3040+
def _validate_deferred_refresh_for_audits(
3041+
self,
3042+
view_name: TableName,
3043+
view_properties: t.Optional[t.Dict[str, exp.Expr]],
3044+
) -> None:
3045+
"""
3046+
Ensure a materialized view with audits uses REFRESH DEFERRED.
3047+
3048+
StarRocks audits require data to exist in the MV, so SQLMesh issues an explicit synchronous
3049+
`REFRESH MATERIALIZED VIEW ... WITH SYNC MODE` right after creating the MV. For that to be
3050+
deterministic, the MV must use `refresh_moment = 'DEFERRED'`; otherwise StarRocks' automatic
3051+
(IMMEDIATE) refresh would run concurrently and race with the explicit one. A missing
3052+
refresh_moment defaults to IMMEDIATE in StarRocks, so it is rejected as well.
3053+
"""
3054+
refresh_moment = (view_properties or {}).get("refresh_moment")
3055+
normalized = (
3056+
PropertyValidator.validate_and_normalize_property("refresh_moment", refresh_moment)
3057+
if refresh_moment is not None
3058+
else None
3059+
)
3060+
if normalized != "DEFERRED":
3061+
raise SQLMeshError(
3062+
f"[StarRocks] Materialized view '{exp.to_table(view_name).sql(dialect=self.dialect)}' "
3063+
"has audits, which require a synchronous refresh after creation. This is only "
3064+
"supported with deferred refresh, so the model must set "
3065+
"`refresh_moment = 'DEFERRED'` in its physical_properties "
3066+
f"(got {normalized or 'no refresh_moment; StarRocks defaults to IMMEDIATE'}). "
3067+
"DEFERRED prevents StarRocks' "
3068+
"automatic refresh from racing with the synchronous refresh SQLMesh issues."
3069+
)
3070+
30233071
def _build_refresh_property(
30243072
self, table_properties: t.Dict[str, t.Any]
30253073
) -> t.Optional[exp.RefreshTriggerProperty]:

sqlmesh/core/snapshot/evaluator.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2753,13 +2753,12 @@ def insert(
27532753

27542754
logger.info("Replacing view '%s'", table_name)
27552755
materialized_properties = None
2756-
if is_materialized_view and (
2757-
model.partitioned_by or model.partition_interval_unit or model.clustered_by
2758-
):
2756+
if is_materialized_view:
27592757
materialized_properties = {
27602758
"partitioned_by": model.partitioned_by,
27612759
"partition_interval_unit": model.partition_interval_unit,
27622760
"clustered_by": model.clustered_by,
2761+
"has_audits": bool(model.audits_with_args),
27632762
}
27642763
self.adapter.create_view(
27652764
table_name,
@@ -2818,6 +2817,7 @@ def create(
28182817
"partitioned_by": model.partitioned_by,
28192818
"clustered_by": model.clustered_by,
28202819
"partition_interval_unit": model.partition_interval_unit,
2820+
"has_audits": bool(model.audits_with_args),
28212821
}
28222822
self.adapter.create_view(
28232823
table_name,
@@ -2853,11 +2853,22 @@ def migrate(
28532853
execution_time=now(), snapshots=kwargs["snapshots"], engine_adapter=self.adapter
28542854
)
28552855

2856+
is_materialized_view = self._is_materialized_view(model)
2857+
materialized_properties = None
2858+
if is_materialized_view:
2859+
materialized_properties = {
2860+
"partitioned_by": model.partitioned_by,
2861+
"clustered_by": model.clustered_by,
2862+
"partition_interval_unit": model.partition_interval_unit,
2863+
"has_audits": bool(model.audits_with_args),
2864+
}
2865+
28562866
self.adapter.create_view(
28572867
target_table_name,
28582868
model.render_query_or_raise(**render_kwargs),
28592869
model.columns_to_types,
2860-
materialized=self._is_materialized_view(model),
2870+
materialized=is_materialized_view,
2871+
materialized_properties=materialized_properties,
28612872
view_properties=model.render_physical_properties(**render_kwargs),
28622873
table_description=model.description,
28632874
column_descriptions=model.column_descriptions,

tests/core/engine_adapter/test_starrocks.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,118 @@ def test_create_materialized_view_without_refresh_raises(
350350
view_properties={"replication_num": exp.Literal.string("1")},
351351
)
352352

353+
def test_create_materialized_view_with_audits_emits_sync_refresh(
354+
self, make_mocked_engine_adapter: t.Callable[..., StarRocksEngineAdapter]
355+
):
356+
"""When an MV has audits, SQLMesh must synchronously refresh it right after creation.
357+
358+
Audits require data to exist in the MV. With REFRESH DEFERRED, StarRocks does not populate
359+
the MV on creation, so SQLMesh issues an explicit `REFRESH MATERIALIZED VIEW ... WITH SYNC
360+
MODE` that blocks until the data is materialized.
361+
"""
362+
adapter = make_mocked_engine_adapter(StarRocksEngineAdapter)
363+
adapter.create_view(
364+
"test_mv",
365+
parse_one("SELECT a FROM tbl"),
366+
materialized=True,
367+
target_columns_to_types={"a": exp.DataType.build("INT")},
368+
materialized_properties={"has_audits": True},
369+
view_properties={
370+
"refresh_moment": exp.Var(this="DEFERRED"),
371+
"refresh_scheme": exp.Var(this="ASYNC"),
372+
},
373+
)
374+
375+
calls = to_sql_calls(adapter)
376+
assert calls[0] == "DROP MATERIALIZED VIEW IF EXISTS `test_mv`"
377+
assert "CREATE MATERIALIZED VIEW" in calls[1]
378+
assert "REFRESH DEFERRED" in calls[1]
379+
assert calls[2] == "REFRESH MATERIALIZED VIEW `test_mv` WITH SYNC MODE"
380+
381+
def test_create_materialized_view_with_audits_emits_sync_refresh_on_first_create(
382+
self, make_mocked_engine_adapter: t.Callable[..., StarRocksEngineAdapter]
383+
):
384+
"""The sync refresh must also fire on first-time creation (replace=False).
385+
386+
ViewStrategy.create calls create_view with replace=False, so the DROP is skipped, but the
387+
synchronous refresh still needs to populate the MV before audits run.
388+
"""
389+
adapter = make_mocked_engine_adapter(StarRocksEngineAdapter)
390+
adapter.create_view(
391+
"test_mv",
392+
parse_one("SELECT a FROM tbl"),
393+
replace=False,
394+
materialized=True,
395+
target_columns_to_types={"a": exp.DataType.build("INT")},
396+
materialized_properties={"has_audits": True},
397+
view_properties={
398+
"refresh_moment": exp.Var(this="DEFERRED"),
399+
"refresh_scheme": exp.Var(this="ASYNC"),
400+
},
401+
)
402+
403+
calls = to_sql_calls(adapter)
404+
assert all("DROP MATERIALIZED VIEW" not in sql for sql in calls)
405+
assert "CREATE MATERIALIZED VIEW" in calls[0]
406+
assert "REFRESH DEFERRED" in calls[0]
407+
assert calls[1] == "REFRESH MATERIALIZED VIEW `test_mv` WITH SYNC MODE"
408+
409+
def test_create_materialized_view_with_audits_immediate_refresh_raises(
410+
self, make_mocked_engine_adapter: t.Callable[..., StarRocksEngineAdapter]
411+
):
412+
"""An MV with audits must use REFRESH DEFERRED; IMMEDIATE must raise and not create anything."""
413+
adapter = make_mocked_engine_adapter(StarRocksEngineAdapter)
414+
with pytest.raises(SQLMeshError, match="DEFERRED"):
415+
adapter.create_view(
416+
"test_mv",
417+
parse_one("SELECT a FROM tbl"),
418+
materialized=True,
419+
target_columns_to_types={"a": exp.DataType.build("INT")},
420+
materialized_properties={"has_audits": True},
421+
view_properties={
422+
"refresh_moment": exp.Var(this="IMMEDIATE"),
423+
"refresh_scheme": exp.Var(this="ASYNC"),
424+
},
425+
)
426+
427+
# Fail-fast: nothing should have been dropped or created.
428+
assert all("CREATE MATERIALIZED VIEW" not in sql for sql in to_sql_calls(adapter))
429+
assert all("DROP MATERIALIZED VIEW" not in sql for sql in to_sql_calls(adapter))
430+
431+
def test_create_materialized_view_with_audits_missing_refresh_moment_raises(
432+
self, make_mocked_engine_adapter: t.Callable[..., StarRocksEngineAdapter]
433+
):
434+
"""A missing refresh_moment defaults to IMMEDIATE in StarRocks, so audits must raise."""
435+
adapter = make_mocked_engine_adapter(StarRocksEngineAdapter)
436+
with pytest.raises(SQLMeshError, match="DEFERRED"):
437+
adapter.create_view(
438+
"test_mv",
439+
parse_one("SELECT a FROM tbl"),
440+
materialized=True,
441+
target_columns_to_types={"a": exp.DataType.build("INT")},
442+
materialized_properties={"has_audits": True},
443+
view_properties={"refresh_scheme": exp.Var(this="ASYNC")},
444+
)
445+
446+
def test_create_materialized_view_without_audits_does_not_sync_refresh(
447+
self, make_mocked_engine_adapter: t.Callable[..., StarRocksEngineAdapter]
448+
):
449+
"""Without audits, no synchronous refresh is issued and IMMEDIATE refresh is allowed."""
450+
adapter = make_mocked_engine_adapter(StarRocksEngineAdapter)
451+
adapter.create_view(
452+
"test_mv",
453+
parse_one("SELECT a FROM tbl"),
454+
materialized=True,
455+
target_columns_to_types={"a": exp.DataType.build("INT")},
456+
materialized_properties={"has_audits": False},
457+
view_properties={
458+
"refresh_moment": exp.Var(this="IMMEDIATE"),
459+
"refresh_scheme": exp.Var(this="ASYNC"),
460+
},
461+
)
462+
463+
assert all("WITH SYNC MODE" not in sql for sql in to_sql_calls(adapter))
464+
353465
def test_does_not_recreate_materialized_view_on_evaluation(self):
354466
"""StarRocks async MVs maintain themselves, so SQLMesh must not recreate them on every run.
355467

tests/core/test_snapshot_evaluator.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -843,7 +843,12 @@ def test_evaluate_materialized_view_with_execution_time_macro(
843843
view_properties={},
844844
table_description=None,
845845
column_descriptions={},
846-
materialized_properties=None,
846+
materialized_properties={
847+
"partitioned_by": [],
848+
"partition_interval_unit": None,
849+
"clustered_by": [],
850+
"has_audits": False,
851+
},
847852
)
848853

849854

@@ -1305,6 +1310,7 @@ def test_create_materialized_view(mocker: MockerFixture, adapter_mock, make_snap
13051310
"clustered_by": [],
13061311
"partition_interval_unit": None,
13071312
"partitioned_by": [],
1313+
"has_audits": False,
13081314
},
13091315
view_properties={},
13101316
table_description=None,
@@ -1354,6 +1360,7 @@ def test_create_view_with_properties(mocker: MockerFixture, adapter_mock, make_s
13541360
"clustered_by": [],
13551361
"partition_interval_unit": None,
13561362
"partitioned_by": [],
1363+
"has_audits": False,
13571364
},
13581365
table_description=None,
13591366
replace=False,
@@ -1364,6 +1371,45 @@ def test_create_view_with_properties(mocker: MockerFixture, adapter_mock, make_s
13641371
)
13651372

13661373

1374+
def test_create_materialized_view_with_audits_sets_has_audits(
1375+
mocker: MockerFixture, adapter_mock, make_snapshot
1376+
):
1377+
"""A materialized view model with audits must propagate has_audits=True to the adapter.
1378+
1379+
Engines like StarRocks rely on this flag to synchronously refresh the MV before audits run.
1380+
"""
1381+
adapter_mock.get_data_objects.return_value = []
1382+
adapter_mock.table_exists.return_value = False
1383+
1384+
evaluator = SnapshotEvaluator(adapter_mock)
1385+
1386+
model = load_sql_based_model(
1387+
parse( # type: ignore
1388+
"""
1389+
MODEL (
1390+
name test_schema.test_model,
1391+
kind VIEW (
1392+
materialized true
1393+
),
1394+
audits (
1395+
not_null(columns := (a))
1396+
)
1397+
);
1398+
1399+
SELECT a::int FROM tbl;
1400+
"""
1401+
),
1402+
)
1403+
1404+
snapshot = make_snapshot(model)
1405+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
1406+
1407+
evaluator.create([snapshot], {})
1408+
1409+
_, kwargs = adapter_mock.create_view.call_args
1410+
assert kwargs["materialized_properties"]["has_audits"] is True
1411+
1412+
13671413
def test_promote_model_info(mocker: MockerFixture, make_snapshot):
13681414
adapter_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter")
13691415
adapter_mock.dialect = "duckdb"

0 commit comments

Comments
 (0)