Skip to content

Commit 730c660

Browse files
authored
Feat: Use view name for targets in backfill progress (#661)
* use view name for targets in backfill progress * use view name for targets in backfill progress * cleanup
1 parent e41cc70 commit 730c660

File tree

8 files changed

+39
-20
lines changed

8 files changed

+39
-20
lines changed

docs/guides/models.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,13 @@ Enter the backfill start date (eg. '1 year', '2020-01-01') or blank for the begi
9191
Enter the backfill end date (eg. '1 month ago', '2020-01-01') or blank to backfill up until now:
9292
Apply - Backfill Tables [y/n]: y
9393
94+
sqlmesh_example__dev.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00
95+
9496
All model batches have been executed successfully
9597
96-
sqlmesh_example.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00
98+
Virtually Updating 'dev' ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 0:00:00
99+
100+
The target environment has been updated successfully
97101
```
98102

99103
For more information, refer to [plans](../concepts/plans.md).
@@ -220,7 +224,7 @@ To delete a model:
220224

221225
All model batches have been executed successfully
222226

223-
sqlmesh_example.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00
227+
sqlmesh_example__dev.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00
224228
```
225229

226230
**Note:** If you have other files that reference the model you wish to delete, an error message will note the file(s) containing the reference. You will need to also delete these files in order to apply the change.

docs/quick_start.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ Models needing backfill (missing dates):
141141
Enter the backfill start date (eg. '1 year', '2020-01-01') or blank for the beginning of history:
142142
Enter the backfill end date (eg. '1 month ago', '2020-01-01') or blank to backfill up until now:
143143
Apply - Backfill Tables [y/n]: y
144-
sqlmesh_example.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00
144+
145+
sqlmesh_example__dev.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00
145146

146147
All model batches have been executed successfully
147148

sqlmesh/core/console.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ class Console(abc.ABC):
4040
with them when their input is needed"""
4141

4242
@abc.abstractmethod
43-
def start_snapshot_progress(self, snapshot_name: str, total_batches: int) -> None:
43+
def start_snapshot_progress(
44+
self, snapshot: Snapshot, total_batches: int, environment: str
45+
) -> None:
4446
"""Indicates that a new load progress has begun."""
4547

4648
@abc.abstractmethod
@@ -138,11 +140,13 @@ def _prompt(self, message: str, **kwargs: t.Any) -> t.Any:
138140
def _confirm(self, message: str, **kwargs: t.Any) -> bool:
139141
return Confirm.ask(message, console=self.console, **kwargs)
140142

141-
def start_snapshot_progress(self, snapshot_name: str, total_batches: int) -> None:
143+
def start_snapshot_progress(
144+
self, snapshot: Snapshot, total_batches: int, environment: str
145+
) -> None:
142146
"""Indicates that a new load progress has begun."""
143147
if not self.evaluation_progress:
144148
self.evaluation_progress = Progress(
145-
TextColumn("[bold blue]{task.fields[snapshot_name]}", justify="right"),
149+
TextColumn("[bold blue]{task.fields[view_name]}", justify="right"),
146150
BarColumn(bar_width=40),
147151
"[progress.percentage]{task.percentage:>3.1f}%",
148152
"•",
@@ -153,10 +157,11 @@ def start_snapshot_progress(self, snapshot_name: str, total_batches: int) -> Non
153157
)
154158
self.evaluation_progress.start()
155159
self.evaluation_tasks = {}
156-
self.evaluation_tasks[snapshot_name] = (
160+
view_name = snapshot.qualified_view_name.for_environment(environment)
161+
self.evaluation_tasks[snapshot.name] = (
157162
self.evaluation_progress.add_task(
158-
f"Running {snapshot_name}...",
159-
snapshot_name=snapshot_name,
163+
f"Running {view_name}...",
164+
view_name=view_name,
160165
total=total_batches,
161166
),
162167
total_batches,

sqlmesh/core/context.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,13 +399,14 @@ def run(
399399
"""Run the entire dag through the scheduler.
400400
401401
Args:
402-
environment: The target environment to source model snapshots from. Default: prod.
402+
environment: The target environment to source model snapshots from and virtually update. Default: prod.
403403
start: The start of the interval to render.
404404
end: The end of the interval to render.
405405
latest: The latest time used for non incremental datasets.
406-
skip_janitor: Whether to skip the jantitor task.
406+
skip_janitor: Whether to skip the janitor task.
407407
"""
408-
self.scheduler(environment=environment or c.PROD).run(start, end, latest)
408+
environment = environment or c.PROD
409+
self.scheduler(environment=environment).run(environment, start, end, latest)
409410

410411
if not skip_janitor:
411412
self._run_janitor()

sqlmesh/core/plan/evaluator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def evaluate(self, plan: Plan) -> None:
7373
max_workers=self.backfill_concurrent_tasks,
7474
console=self.console,
7575
)
76-
is_run_successful = scheduler.run(plan.start, plan.end, is_dev=plan.is_dev)
76+
is_run_successful = scheduler.run(plan.environment.name, plan.start, plan.end)
7777
if not is_run_successful:
7878
raise SQLMeshError("Plan application failed.")
7979

sqlmesh/core/scheduler.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import typing as t
55
from datetime import datetime
66

7+
from sqlmesh.core import constants as c
78
from sqlmesh.core.console import Console, get_console
89
from sqlmesh.core.snapshot import (
910
Snapshot,
@@ -138,25 +139,25 @@ def evaluate(
138139

139140
def run(
140141
self,
142+
environment: str,
141143
start: t.Optional[TimeLike] = None,
142144
end: t.Optional[TimeLike] = None,
143145
latest: t.Optional[TimeLike] = None,
144-
is_dev: bool = False,
145146
) -> bool:
146147
"""Concurrently runs all snapshots in topological order.
147148
148149
Args:
150+
environment: The environment the user is targeting when applying their change.
149151
start: The start of the run. Defaults to the min model start date.
150152
end: The end of the run. Defaults to now.
151153
latest: The latest datetime to use for non-incremental queries.
152-
is_dev: Indicates whether the evaluation happens in the development mode and temporary
153-
tables / table clones should be used where applicable.
154154
155155
Returns:
156156
True if the execution was successful and False otherwise.
157157
"""
158158
validate_date_range(start, end)
159159

160+
is_dev = environment != c.PROD
160161
latest = latest or now()
161162
batches = self.batches(start, end, latest, is_dev=is_dev)
162163
dag = self._dag(batches)
@@ -167,7 +168,7 @@ def run(
167168
continue
168169
visited.add(snapshot)
169170
intervals = batches[snapshot]
170-
self.console.start_snapshot_progress(snapshot.name, len(intervals))
171+
self.console.start_snapshot_progress(snapshot, len(intervals), environment)
171172

172173
def evaluate_node(node: SchedulingUnit) -> None:
173174
assert latest

tests/core/test_scheduler.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pytest
22
from sqlglot import parse_one
33

4+
from sqlmesh.core import constants as c
45
from sqlmesh.core.context import Context
56
from sqlmesh.core.scheduler import Scheduler
67
from sqlmesh.core.snapshot import Snapshot, SnapshotFingerprint
@@ -116,6 +117,7 @@ def test_run(sushi_context_pre_scheduling: Context, scheduler: Scheduler):
116117
adapter = sushi_context_pre_scheduling.engine_adapter
117118
snapshot = sushi_context_pre_scheduling.snapshots["sushi.items"]
118119
scheduler.run(
120+
c.PROD,
119121
"2022-01-01",
120122
"2022-01-03",
121123
"2022-01-30",

web/server/console.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import unittest
77

88
from sqlmesh.core.console import TerminalConsole
9+
from sqlmesh.core.snapshot import Snapshot
910
from sqlmesh.core.test import ModelTest
1011
from sqlmesh.utils.date import now_timestamp
1112
from web.server.sse import Event
@@ -14,7 +15,7 @@
1415
class ApiConsole(TerminalConsole):
1516
def __init__(self) -> None:
1617
super().__init__()
17-
self.current_task_status: t.Dict[str, t.Dict[str, int]] = {}
18+
self.current_task_status: t.Dict[str, t.Dict[str, t.Any]] = {}
1819
self.queue: asyncio.Queue = asyncio.Queue()
1920

2021
def _make_event(
@@ -33,12 +34,16 @@ def _make_event(
3334
data=json.dumps(payload),
3435
)
3536

36-
def start_snapshot_progress(self, snapshot_name: str, total_batches: int) -> None:
37+
def start_snapshot_progress(
38+
self, snapshot: Snapshot, total_batches: int, environment: str
39+
) -> None:
3740
"""Indicates that a new load progress has begun."""
38-
self.current_task_status[snapshot_name] = {
41+
view_name = snapshot.qualified_view_name.for_environment(environment)
42+
self.current_task_status[snapshot.name] = {
3943
"completed": 0,
4044
"total": total_batches,
4145
"start": now_timestamp(),
46+
"view_name": view_name,
4247
}
4348

4449
def update_snapshot_progress(self, snapshot_name: str, num_batches: int) -> None:

0 commit comments

Comments
 (0)