Skip to content

Commit 48ebc1e

Browse files
authored
Feat: Add ability to set the effective date for forward-only snapshots (#785)
1 parent 13552f2 commit 48ebc1e

11 files changed

Lines changed: 221 additions & 20 deletions

File tree

docs/concepts/plans.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ To create a forward-only plan, the `--forward-only` option has to be added to th
7676
sqlmesh plan --forward-only
7777
```
7878

79+
### Effective date
80+
Changes that are part of the forward-only plan can also be applied retroactively to the production environment by specifying the effective date:
81+
```bash
82+
sqlmesh plan --forward-only --effective-from 2023-01-01
83+
```
84+
This way SQLMesh will know to recompute data intervals starting from the specified date once forward-only changes are deployed to production.
85+
7986
## Restatement plans
8087
There are cases when models need to be re-evaluated for a given time range, even though changes may not have been made to those model definitions. This could be due to an upstream issue with a dataset defined outside the SQLMesh platform, or when a [forward-only plan](#forward-only-plans) change needs to be applied retroactively to a bounded interval of historical data.
8188

sqlmesh/cli/main.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,12 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None:
212212
is_flag=True,
213213
help="Create a plan for forward-only changes.",
214214
)
215+
@click.option(
216+
"--effective-from",
217+
type=str,
218+
required=False,
219+
help="The effective date from which to apply forward-only changes on production.",
220+
)
215221
@click.option(
216222
"--no-prompts",
217223
is_flag=True,

sqlmesh/core/console.py

Lines changed: 72 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory
1818
from sqlmesh.core.test import ModelTest
1919
from sqlmesh.utils import rich as srich
20-
from sqlmesh.utils.date import to_date
20+
from sqlmesh.utils.date import to_date, yesterday_ds
2121

2222
if t.TYPE_CHECKING:
2323
import ipywidgets as widgets
@@ -292,6 +292,9 @@ def plan(self, plan: Plan, auto_apply: bool) -> None:
292292
plan.apply()
293293

294294
def _show_options_after_categorization(self, plan: Plan, auto_apply: bool) -> None:
295+
if plan.forward_only and plan.new_snapshots:
296+
self._prompt_effective_from(plan, auto_apply)
297+
295298
if plan.requires_backfill:
296299
self._show_missing_dates(plan)
297300
self._prompt_backfill(plan, auto_apply)
@@ -348,22 +351,43 @@ def _show_missing_dates(self, plan: Plan) -> None:
348351
)
349352
self._print(backfill)
350353

354+
def _prompt_effective_from(self, plan: Plan, auto_apply: bool) -> None:
355+
if not plan.effective_from:
356+
effective_from = self._prompt(
357+
"Enter the effective date (eg. '1 year', '2020-01-01') to apply forward-only changes retroactively or blank to only apply them going forward once changes are deployed to prod"
358+
)
359+
if effective_from:
360+
plan.effective_from = effective_from
361+
362+
if plan.is_dev and plan.effective_from:
363+
plan.set_start(plan.effective_from)
364+
351365
def _prompt_backfill(self, plan: Plan, auto_apply: bool) -> None:
352366
is_forward_only_dev = plan.is_dev and plan.forward_only
353367
backfill_or_preview = "preview" if is_forward_only_dev else "backfill"
354368

355369
if plan.is_start_and_end_allowed:
356370
if not plan.override_start:
357-
blank_meaning = (
358-
"to preview starting from yesterday"
359-
if is_forward_only_dev
360-
else "for the beginning of history"
361-
)
371+
if is_forward_only_dev:
372+
if plan.effective_from:
373+
blank_meaning = (
374+
f"to preview starting from the effective date ('{plan.effective_from}')"
375+
)
376+
default_start = plan.effective_from
377+
else:
378+
blank_meaning = "to preview starting from yesterday"
379+
default_start = yesterday_ds()
380+
else:
381+
blank_meaning = "to backfill from the beginning of history"
382+
default_start = None
383+
362384
start = self._prompt(
363385
f"Enter the {backfill_or_preview} start date (eg. '1 year', '2020-01-01') or blank {blank_meaning}",
364386
)
365387
if start:
366388
plan.start = start
389+
elif default_start:
390+
plan.start = default_start
367391

368392
if not plan.override_end:
369393
end = self._prompt(
@@ -533,6 +557,48 @@ def _prompt_promote(self, plan: Plan) -> None:
533557
button.on_click(self._apply)
534558
button.output = output
535559

560+
def _prompt_effective_from(self, plan: Plan, auto_apply: bool) -> None:
561+
import ipywidgets as widgets
562+
563+
prompt = widgets.VBox()
564+
565+
def effective_from_change_callback(change: t.Dict[str, datetime.datetime]) -> None:
566+
plan.effective_from = change["new"]
567+
self._show_options_after_categorization(plan, auto_apply)
568+
569+
def going_forward_change_callback(change: t.Dict[str, bool]) -> None:
570+
checked = change["new"]
571+
plan.effective_from = None if checked else yesterday_ds()
572+
self._show_options_after_categorization(plan, auto_apply=auto_apply)
573+
574+
date_picker = widgets.DatePicker(
575+
disabled=plan.effective_from is None,
576+
value=to_date(plan.effective_from or yesterday_ds()),
577+
layout={"width": "auto"},
578+
)
579+
date_picker.observe(effective_from_change_callback, "value")
580+
581+
going_forward_checkbox = widgets.Checkbox(
582+
value=plan.effective_from is None,
583+
description="Apply Going Forward Once Deployed To Prod",
584+
disabled=False,
585+
indent=False,
586+
)
587+
going_forward_checkbox.observe(going_forward_change_callback, "value")
588+
589+
add_to_layout_widget(
590+
prompt,
591+
widgets.HBox(
592+
[
593+
widgets.Label("Effective From Date:", layout={"width": "8rem"}),
594+
date_picker,
595+
going_forward_checkbox,
596+
]
597+
),
598+
)
599+
600+
self._add_to_dynamic_options(prompt)
601+
536602
def _prompt_backfill(self, plan: Plan, auto_apply: bool) -> None:
537603
import ipywidgets as widgets
538604

@@ -552,17 +618,6 @@ def _date_picker(
552618
picker.observe(on_change, "value")
553619
return picker
554620

555-
def _checkbox(description: str, value: bool, on_change: t.Callable) -> widgets.Checkbox:
556-
checkbox = widgets.Checkbox(
557-
value=value,
558-
description=description,
559-
disabled=False,
560-
indent=False,
561-
)
562-
563-
checkbox.observe(on_change, "value")
564-
return checkbox
565-
566621
def start_change_callback(change: t.Dict[str, datetime.datetime]) -> None:
567622
plan.start = change["new"]
568623
self._show_options_after_categorization(plan, auto_apply)

sqlmesh/core/context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,7 @@ def plan(
584584
no_prompts: bool = False,
585585
auto_apply: bool = False,
586586
no_auto_categorization: t.Optional[bool] = None,
587+
effective_from: t.Optional[TimeLike] = None,
587588
) -> Plan:
588589
"""Interactively create a migration plan.
589590
@@ -616,6 +617,7 @@ def plan(
616617
no_auto_categorization: Indicates whether to disable automatic categorization of model
617618
changes (breaking / non-breaking). If not provided, then the corresponding configuration
618619
option determines the behavior.
620+
effective_from: The effective date from which to apply forward-only changes on production.
619621
620622
Returns:
621623
The populated Plan object.
@@ -645,6 +647,7 @@ def plan(
645647
environment_ttl=self.environment_ttl,
646648
categorizer_config=self.auto_categorize_changes,
647649
auto_categorization_enabled=not no_auto_categorization,
650+
effective_from=effective_from,
648651
)
649652

650653
if not no_prompts:

sqlmesh/core/plan/definition.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ class Plan:
6060
environment_ttl: The period of time that a development environment should exist before being deleted.
6161
categorizer_config: Auto categorization settings.
6262
auto_categorization_enabled: Whether to apply auto categorization.
63+
effective_from: The effective date from which to apply forward-only changes on production.
6364
"""
6465

6566
def __init__(
@@ -78,6 +79,7 @@ def __init__(
7879
environment_ttl: t.Optional[str] = None,
7980
categorizer_config: t.Optional[CategorizerConfig] = None,
8081
auto_categorization_enabled: bool = True,
82+
effective_from: t.Optional[TimeLike] = None,
8183
):
8284
self.context_diff = context_diff
8385
self.override_start = start is not None
@@ -90,6 +92,7 @@ def __init__(
9092
self.environment_ttl = environment_ttl
9193
self.categorizer_config = categorizer_config or CategorizerConfig()
9294
self.auto_categorization_enabled = auto_categorization_enabled
95+
self._effective_from: t.Optional[TimeLike] = None
9396
self._start = start if start or not (is_dev and forward_only) else yesterday_ds()
9497
self._end = end if end or not is_dev else now()
9598
self._latest = latest or now()
@@ -133,6 +136,9 @@ def __init__(
133136
self._categorized: t.Optional[t.List[Snapshot]] = None
134137
self._uncategorized: t.Optional[t.List[Snapshot]] = None
135138

139+
if effective_from:
140+
self._set_effective_from(effective_from)
141+
136142
@property
137143
def categorized(self) -> t.List[Snapshot]:
138144
"""Returns the already categorized snapshots."""
@@ -319,6 +325,40 @@ def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> None
319325
self._categorized = None
320326
self._uncategorized = None
321327

328+
@property
329+
def effective_from(self) -> t.Optional[TimeLike]:
330+
"""The effective date for all new snapshots in the plan.
331+
332+
Note: this is only applicable for forward-only plans.
333+
334+
Returns:
335+
The effective date.
336+
"""
337+
return self._effective_from
338+
339+
@effective_from.setter
340+
def effective_from(self, effective_from: t.Optional[TimeLike]) -> None:
341+
"""Sets the effective date for all new snapshots in the plan.
342+
343+
Note: this is only applicable for forward-only plans.
344+
345+
Args:
346+
effective_from: The effective date to set.
347+
"""
348+
self._set_effective_from(effective_from)
349+
350+
def _set_effective_from(self, effective_from: t.Optional[TimeLike]) -> None:
351+
if not self.forward_only:
352+
raise PlanError("Effective date can only be set for a forward-only plan.")
353+
if effective_from and to_datetime(effective_from) > now():
354+
raise PlanError("Effective date cannot be in the future.")
355+
356+
self.__missing_intervals = None
357+
self._effective_from = effective_from
358+
359+
for snapshot in self.new_snapshots:
360+
snapshot.effective_from = effective_from
361+
322362
@property
323363
def _missing_intervals(self) -> t.Dict[str, Intervals]:
324364
if self.__missing_intervals is None:

sqlmesh/core/snapshot/definition.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,8 @@ class Snapshot(PydanticModel, SnapshotInfoMixin):
295295
change_category: User specified change category indicating which models require backfill from model changes made in this snapshot.
296296
unpaused_ts: The timestamp which indicates when this snapshot was unpaused. Unpaused means that
297297
this snapshot is evaluated on a recurring basis. None indicates that this snapshot is paused.
298+
effective_from: The timestamp which indicates when this snapshot should be considered effective.
299+
Applicable for forward-only snapshots only.
298300
"""
299301

300302
name: str
@@ -315,6 +317,7 @@ class Snapshot(PydanticModel, SnapshotInfoMixin):
315317
temp_version: t.Optional[str] = None
316318
change_category: t.Optional[SnapshotChangeCategory] = None
317319
unpaused_ts: t.Optional[int] = None
320+
effective_from: t.Optional[TimeLike] = None
318321

319322
@validator("ttl")
320323
@classmethod
@@ -499,8 +502,16 @@ def merge_intervals(self, other: Snapshot) -> None:
499502
Args:
500503
other: The target snapshot to inherit intervals from.
501504
"""
505+
effective_from_ts = to_timestamp(self.effective_from) if self.effective_from else 0
506+
apply_effective_from = effective_from_ts > 0 and self.fingerprint != other.fingerprint
507+
502508
for start, end in other.intervals:
503-
self.add_interval(start, end)
509+
# If the effective_from is set, then intervals that come after it must come from
510+
# the current snapshost.
511+
if apply_effective_from and start < effective_from_ts:
512+
end = min(end, effective_from_ts)
513+
if not apply_effective_from or end <= effective_from_ts:
514+
self.add_interval(start, end)
504515

505516
def missing_intervals(
506517
self, start: TimeLike, end: TimeLike, latest: t.Optional[TimeLike] = None

sqlmesh/magics.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,11 @@ def test(self, line: str, test_def_raw: t.Optional[str] = None) -> None:
213213
action="store_true",
214214
help="Create a plan for forward-only changes.",
215215
)
216+
@argument(
217+
"--effective-from",
218+
type=str,
219+
help="The effective date from which to apply forward-only changes on production.",
220+
)
216221
@argument(
217222
"--no-prompts",
218223
action="store_true",
@@ -253,6 +258,7 @@ def plan(self, line: str) -> None:
253258
no_prompts=args.no_prompts,
254259
auto_apply=args.auto_apply,
255260
no_auto_categorization=args.no_auto_categorization,
261+
effective_from=args.effective_from,
256262
)
257263
self._context.console = console
258264

sqlmesh/schedulers/airflow/plan.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,18 @@ def create_plan_dag_spec(
5151
snapshots_for_intervals.pop(sid)
5252

5353
if request.restatements:
54+
snapshots_for_restatement = (
55+
snapshots_for_intervals.values()
56+
if not request.is_dev
57+
else [snapshots_for_intervals[s.snapshot_id] for s in request.environment.snapshots]
58+
)
5459
state_sync.remove_interval(
5560
[],
5661
start=request.environment.start_at,
5762
end=end,
5863
all_snapshots=(
5964
snapshot
60-
for snapshot in snapshots_for_intervals.values()
65+
for snapshot in snapshots_for_restatement
6166
if snapshot.name in request.restatements
6267
and snapshot.snapshot_id not in new_snapshots
6368
),

tests/core/test_integration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,7 @@ def apply_to_environment(
667667
plan = context.plan(
668668
environment,
669669
forward_only=choice == SnapshotChangeCategory.FORWARD_ONLY,
670+
no_prompts=True,
670671
)
671672
plan.set_start(start(context))
672673

tests/core/test_plan.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
SnapshotDataVersion,
1414
SnapshotFingerprint,
1515
)
16-
from sqlmesh.utils.date import to_date, to_datetime, to_timestamp
16+
from sqlmesh.utils.date import now, to_date, to_datetime, to_timestamp
1717
from sqlmesh.utils.errors import PlanError
1818

1919

@@ -423,3 +423,43 @@ def test_broken_references(make_snapshot, mocker: MockerFixture):
423423
match=r"Removed models {'a'} are referenced in model 'b'.*",
424424
):
425425
Plan(context_diff_mock, state_reader_mock)
426+
427+
428+
def test_effective_from(make_snapshot, mocker: MockerFixture):
429+
snapshot = make_snapshot(SqlModel(name="a", query=parse_one("select 1, ds FROM a")))
430+
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
431+
432+
context_diff_mock = mocker.Mock()
433+
context_diff_mock.snapshots = {"a": snapshot}
434+
context_diff_mock.added = set()
435+
context_diff_mock.removed = set()
436+
context_diff_mock.modified_snapshots = {}
437+
context_diff_mock.new_snapshots = {snapshot.snapshot_id: snapshot}
438+
439+
state_reader_mock = mocker.Mock()
440+
441+
with pytest.raises(
442+
PlanError,
443+
match="Effective date can only be set for a forward-only plan.",
444+
):
445+
plan = Plan(context_diff_mock, state_reader_mock)
446+
plan.effective_from = "2023-01-01"
447+
448+
plan = Plan(context_diff_mock, state_reader_mock, forward_only=True)
449+
450+
with pytest.raises(
451+
PlanError,
452+
match="Effective date cannot be in the future.",
453+
):
454+
plan.effective_from = now() + timedelta(days=1)
455+
456+
assert plan.effective_from is None
457+
assert snapshot.effective_from is None
458+
459+
plan.effective_from = "2023-01-01"
460+
assert plan.effective_from == "2023-01-01"
461+
assert snapshot.effective_from == "2023-01-01"
462+
463+
plan.effective_from = None
464+
assert plan.effective_from is None
465+
assert snapshot.effective_from is None

0 commit comments

Comments
 (0)