Skip to content

Commit deaaca5

Browse files
committed
Feat: Check freshness for mixed external & sqlmesh models
1 parent 2dd01a4 commit deaaca5

File tree

6 files changed

+391
-170
lines changed

6 files changed

+391
-170
lines changed

.circleci/continue_config.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -313,10 +313,10 @@ workflows:
313313
- athena
314314
- fabric
315315
- gcp-postgres
316-
filters:
317-
branches:
318-
only:
319-
- main
316+
# filters:
317+
# branches:
318+
# only:
319+
# - main
320320
- ui_style
321321
- ui_test
322322
- vscode_test

sqlmesh/core/scheduler.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,11 +373,19 @@ def batch_intervals(
373373
is_restatement=is_restatement,
374374
)
375375

376+
parent_intervals = []
377+
for parent in snapshot.parents:
378+
if parent.snapshot_id not in snapshot_intervals:
379+
continue
380+
_, p_intervals = snapshot_intervals[parent.snapshot_id]
381+
parent_intervals.append(p_intervals)
382+
376383
intervals = self._check_ready_intervals(
377384
snapshot,
378385
intervals,
379386
context,
380387
environment_naming_info,
388+
parent_intervals=parent_intervals,
381389
)
382390
unready -= set(intervals)
383391

@@ -923,6 +931,7 @@ def _check_ready_intervals(
923931
intervals: Intervals,
924932
context: ExecutionContext,
925933
environment_naming_info: EnvironmentNamingInfo,
934+
parent_intervals: t.Optional[t.List[Intervals]] = None,
926935
) -> Intervals:
927936
"""Checks if the intervals are ready for evaluation for the given snapshot.
928937
@@ -965,6 +974,7 @@ def _check_ready_intervals(
965974
dialect=snapshot.model.dialect,
966975
path=snapshot.model._path,
967976
snapshot=snapshot,
977+
parent_intervals=parent_intervals,
968978
kwargs=kwargs,
969979
)
970980
except SQLMeshError as e:

sqlmesh/core/signal.py

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22

33
import typing as t
44
from sqlmesh.utils import UniqueKeyDict, registry_decorator
5+
from sqlmesh.utils.errors import MissingSourceError
56

67
if t.TYPE_CHECKING:
78
from sqlmesh.core.context import ExecutionContext
89
from sqlmesh.core.snapshot.definition import Snapshot
910
from sqlmesh.utils.date import DatetimeRanges
1011
from sqlmesh.core.snapshot.definition import DeployabilityIndex
12+
from sqlmesh.core.snapshot.definition import Intervals
1113

1214

1315
class signal(registry_decorator):
@@ -42,7 +44,17 @@ class signal(registry_decorator):
4244

4345

4446
@signal()
45-
def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionContext) -> bool:
47+
def freshness(
48+
batch: DatetimeRanges,
49+
snapshot: Snapshot,
50+
context: ExecutionContext,
51+
parent_intervals: t.Optional[t.List[Intervals]] = None,
52+
) -> bool:
53+
"""
54+
Implements model freshness as a signal, i.e it considers this model to be fresh if:
55+
- Any upstream SQLMesh model has available intervals to compute i.e is fresh
56+
- Any upstream external model has been altered since the last time the model was evaluated
57+
"""
4658
adapter = context.engine_adapter
4759
if context.is_restatement or not adapter.SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS:
4860
return True
@@ -54,24 +66,38 @@ def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionConte
5466
if deployability_index.is_deployable(snapshot)
5567
else snapshot.dev_last_altered_ts
5668
)
69+
5770
if not last_altered_ts:
5871
return True
5972

6073
parent_snapshots = {context.snapshots[p.name] for p in snapshot.parents}
61-
if len(parent_snapshots) != len(snapshot.node.depends_on) or not all(
62-
p.is_external for p in parent_snapshots
63-
):
74+
if len(parent_snapshots) != len(snapshot.node.depends_on):
6475
# The mismatch can happen if e.g an external model is not registered in the project
6576
return True
6677

67-
# Finding new data means that the upstream depedencies have been altered
68-
# since the last time the model was evaluated
69-
upstream_dep_has_new_data = any(
70-
upstream_last_altered_ts > last_altered_ts
71-
for upstream_last_altered_ts in adapter.get_table_last_modified_ts(
72-
[p.name for p in parent_snapshots]
78+
external_parent_snapshots = {p for p in parent_snapshots if p.is_external}
79+
upstream_parent_snapshots = parent_snapshots - external_parent_snapshots
80+
81+
if upstream_parent_snapshots and parent_intervals:
82+
# At least one upstream sqlmesh model has intervals to compute (i.e is not fresh),
83+
# so the current model should be considered fresh
84+
return True
85+
86+
if external_parent_snapshots:
87+
external_last_altered_timestamps = adapter.get_table_last_modified_ts(
88+
[sp.name for sp in external_parent_snapshots]
89+
)
90+
91+
if len(external_last_altered_timestamps) != len(external_parent_snapshots):
92+
raise MissingSourceError(
93+
f"Expected {len(external_parent_snapshots)} sources to be present, but got {len(external_last_altered_timestamps)}."
94+
)
95+
96+
# Finding new data means that the upstream depedencies have been altered
97+
# since the last time the model was evaluated
98+
return any(
99+
upstream_last_altered_ts > last_altered_ts
100+
for upstream_last_altered_ts in external_last_altered_timestamps
73101
)
74-
)
75102

76-
# Returning true is a no-op, returning False nullifies the batch so the model will not be evaluated.
77-
return upstream_dep_has_new_data
103+
return False

sqlmesh/core/snapshot/definition.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2451,6 +2451,7 @@ def check_ready_intervals(
24512451
dialect: DialectType = None,
24522452
path: t.Optional[Path] = None,
24532453
snapshot: t.Optional[Snapshot] = None,
2454+
parent_intervals: t.Optional[t.List[Intervals]] = None,
24542455
kwargs: t.Optional[t.Dict] = None,
24552456
) -> Intervals:
24562457
checked_intervals: Intervals = []
@@ -2467,6 +2468,7 @@ def check_ready_intervals(
24672468
provided_kwargs=(kwargs or {}),
24682469
context=context,
24692470
snapshot=snapshot,
2471+
parent_intervals=parent_intervals,
24702472
)
24712473
except Exception as ex:
24722474
raise SignalEvalError(format_evaluated_code_exception(ex, python_env))

0 commit comments

Comments
 (0)