Skip to content

Commit e939daf

Browse files
authored
Fix: Improve the inference of the default plan start for new models for which cron is not aligned with interval unit boundary (#3758)
1 parent fc746b2 commit e939daf

2 files changed

Lines changed: 72 additions & 3 deletions

File tree

sqlmesh/core/context.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,7 @@ def run(
649649
True if the run was successful, False otherwise.
650650
"""
651651
environment = environment or self.config.default_target_environment
652+
environment = Environment.sanitize_name(environment)
652653
if not skip_janitor and environment.lower() == c.PROD:
653654
self._run_janitor()
654655

@@ -1344,14 +1345,16 @@ def plan_builder(
13441345
):
13451346
if model_name not in snapshots:
13461347
continue
1347-
interval_unit = snapshots[model_name].node.interval_unit
1348+
node = snapshots[model_name].node
1349+
interval_unit = node.interval_unit
13481350
default_start = min(
13491351
default_start or sys.maxsize,
13501352
to_timestamp(
13511353
interval_unit.cron_prev(
13521354
interval_unit.cron_floor(
1353-
max_interval_end_per_model.get(model_name, default_end),
1354-
estimate=True,
1355+
max_interval_end_per_model.get(
1356+
model_name, node.cron_floor(default_end)
1357+
),
13551358
),
13561359
estimate=True,
13571360
)

tests/core/test_integration.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -701,6 +701,72 @@ def test_cron_not_aligned_with_day_boundary(
701701
]
702702

703703

704+
@time_machine.travel("2023-01-08 00:00:00 UTC")
705+
def test_cron_not_aligned_with_day_boundary_new_model(init_and_plan_context: t.Callable):
706+
context, _ = init_and_plan_context("examples/sushi")
707+
708+
existing_model = context.get_model("sushi.waiter_revenue_by_day")
709+
existing_model = SqlModel.parse_obj(
710+
{
711+
**existing_model.dict(),
712+
"kind": existing_model.kind.copy(update={"forward_only": True}),
713+
}
714+
)
715+
context.upsert_model(existing_model)
716+
717+
plan = context.plan_builder("prod", skip_tests=True).build()
718+
context.apply(plan)
719+
720+
# Add a new model and make a change to a forward-only model.
721+
# The cron of the new model is not aligned with the day boundary.
722+
new_model = load_sql_based_model(
723+
d.parse(
724+
"""
725+
MODEL (
726+
name memory.sushi.new_model,
727+
kind FULL,
728+
cron '0 8 * * *',
729+
start '2023-01-01',
730+
);
731+
732+
SELECT 1 AS one;
733+
"""
734+
)
735+
)
736+
context.upsert_model(new_model)
737+
738+
existing_model = add_projection_to_model(t.cast(SqlModel, existing_model), literal=True)
739+
context.upsert_model(existing_model)
740+
741+
plan = context.plan_builder("dev", skip_tests=True, enable_preview=True).build()
742+
assert plan.missing_intervals == [
743+
SnapshotIntervals(
744+
snapshot_id=context.get_snapshot(
745+
"memory.sushi.new_model", raise_if_missing=True
746+
).snapshot_id,
747+
intervals=[(to_timestamp("2023-01-06"), to_timestamp("2023-01-07"))],
748+
),
749+
SnapshotIntervals(
750+
snapshot_id=context.get_snapshot(
751+
"sushi.top_waiters", raise_if_missing=True
752+
).snapshot_id,
753+
intervals=[
754+
(to_timestamp("2023-01-06"), to_timestamp("2023-01-07")),
755+
(to_timestamp("2023-01-07"), to_timestamp("2023-01-08")),
756+
],
757+
),
758+
SnapshotIntervals(
759+
snapshot_id=context.get_snapshot(
760+
"sushi.waiter_revenue_by_day", raise_if_missing=True
761+
).snapshot_id,
762+
intervals=[
763+
(to_timestamp("2023-01-06"), to_timestamp("2023-01-07")),
764+
(to_timestamp("2023-01-07"), to_timestamp("2023-01-08")),
765+
],
766+
),
767+
]
768+
769+
704770
@time_machine.travel("2023-01-08 00:00:00 UTC")
705771
def test_forward_only_monthly_model(init_and_plan_context: t.Callable):
706772
context, _ = init_and_plan_context("examples/sushi")

0 commit comments

Comments
 (0)