Skip to content

Commit a5cd41c

Browse files
authored
Feat: add progress bar for snapshot creation (#1157)
* Feat: add progress bar for snapshot creation * Rephrase progress bar status message * Cleanup * Cleanup
1 parent 9f0e66c commit a5cd41c

7 files changed

Lines changed: 150 additions & 61 deletions

File tree

sqlmesh/core/console.py

Lines changed: 89 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -44,38 +44,50 @@ class Console(abc.ABC):
4444
with them when their input is needed."""
4545

4646
@abc.abstractmethod
47-
def start_snapshot_progress(
47+
def start_evaluation_progress(
4848
self, snapshot: Snapshot, total_batches: int, environment: str
4949
) -> None:
50-
"""Indicates that a new load progress has begun."""
50+
"""Indicates that a new snapshot evaluation progress has begun."""
5151

5252
@abc.abstractmethod
53-
def update_snapshot_progress(self, snapshot_name: str, num_batches: int) -> None:
54-
"""Update snapshot progress."""
53+
def update_evaluation_progress(self, snapshot_name: str, num_batches: int) -> None:
54+
"""Update the snapshot evaluation progress."""
5555

5656
@abc.abstractmethod
57-
def stop_snapshot_progress(self, success: bool = True) -> None:
58-
"""Stop the load progress."""
57+
def stop_evaluation_progress(self, success: bool = True) -> None:
58+
"""Stop the snapshot evaluation progress."""
59+
60+
@abc.abstractmethod
61+
def start_creation_progress(self, environment: str, total_tasks: int) -> None:
62+
"""Indicates that a new snapshot creation progress has begun."""
63+
64+
@abc.abstractmethod
65+
def update_creation_progress(self, num_tasks: int) -> None:
66+
"""Update the snapshot creation progress."""
67+
68+
@abc.abstractmethod
69+
def stop_creation_progress(self, success: bool = True) -> None:
70+
"""Stop the snapshot creation progress."""
5971

6072
@abc.abstractmethod
6173
def start_promotion_progress(self, environment: str, total_tasks: int) -> None:
62-
"""Indicates that a new promotion progress has begun."""
74+
"""Indicates that a new snapshot promotion progress has begun."""
6375

6476
@abc.abstractmethod
6577
def update_promotion_progress(self, num_tasks: int) -> None:
66-
"""Update promotion progress."""
78+
"""Update the snapshot promotion progress."""
6779

6880
@abc.abstractmethod
6981
def stop_promotion_progress(self, success: bool = True) -> None:
70-
"""Stop the promotion progress."""
82+
"""Stop the snapshot promotion progress."""
7183

7284
@abc.abstractmethod
7385
def start_migration_progress(self, total_tasks: int) -> None:
7486
"""Indicates that a new migration progress has begun."""
7587

7688
@abc.abstractmethod
7789
def update_migration_progress(self, num_tasks: int) -> None:
78-
"""Update migration progress."""
90+
"""Update the migration progress."""
7991

8092
@abc.abstractmethod
8193
def stop_migration_progress(self, success: bool = True) -> None:
@@ -85,7 +97,7 @@ def stop_migration_progress(self, success: bool = True) -> None:
8597
def show_model_difference_summary(
8698
self, context_diff: ContextDiff, detailed: bool = False
8799
) -> None:
88-
"""Displays a summary of differences for the given models"""
100+
"""Displays a summary of differences for the given models."""
89101

90102
@abc.abstractmethod
91103
def plan(self, plan: Plan, auto_apply: bool) -> None:
@@ -151,6 +163,8 @@ def __init__(self, console: t.Optional[RichConsole] = None, **kwargs: t.Any) ->
151163
self.console: RichConsole = console or srich.console
152164
self.evaluation_progress: t.Optional[Progress] = None
153165
self.evaluation_tasks: t.Dict[str, t.Tuple[TaskID, int]] = {}
166+
self.creation_progress: t.Optional[Progress] = None
167+
self.creation_task: t.Optional[TaskID] = None
154168
self.promotion_progress: t.Optional[Progress] = None
155169
self.promotion_task: t.Optional[TaskID] = None
156170
self.migration_progress: t.Optional[Progress] = None
@@ -166,10 +180,10 @@ def _prompt(self, message: str, **kwargs: t.Any) -> t.Any:
166180
def _confirm(self, message: str, **kwargs: t.Any) -> bool:
167181
return Confirm.ask(message, console=self.console, **kwargs)
168182

169-
def start_snapshot_progress(
183+
def start_evaluation_progress(
170184
self, snapshot: Snapshot, total_batches: int, environment: str
171185
) -> None:
172-
"""Indicates that a new load progress has begun."""
186+
"""Indicates that a new snapshot evaluation progress has begun."""
173187
if not self.evaluation_progress:
174188
self.evaluation_progress = Progress(
175189
TextColumn("[bold blue]{task.fields[view_name]}", justify="right"),
@@ -194,23 +208,59 @@ def start_snapshot_progress(
194208
total_batches,
195209
)
196210

197-
def update_snapshot_progress(self, snapshot_name: str, num_batches: int) -> None:
198-
"""Update snapshot progress."""
211+
def update_evaluation_progress(self, snapshot_name: str, num_batches: int) -> None:
212+
"""Update the snapshot evaluation progress."""
199213
if self.evaluation_progress and self.evaluation_tasks:
200214
task_id = self.evaluation_tasks[snapshot_name][0]
201215
self.evaluation_progress.update(task_id, refresh=True, advance=num_batches)
202216

203-
def stop_snapshot_progress(self, success: bool = True) -> None:
204-
"""Stop the load progress"""
217+
def stop_evaluation_progress(self, success: bool = True) -> None:
218+
"""Stop the snapshot evaluation progress."""
205219
self.evaluation_tasks = {}
206220
if self.evaluation_progress:
207221
self.evaluation_progress.stop()
208222
self.evaluation_progress = None
209223
if success:
210224
self.log_success("All model batches have been executed successfully")
211225

226+
def start_creation_progress(self, environment: str, total_tasks: int) -> None:
227+
"""Indicates that a new creation progress has begun."""
228+
if self.creation_progress is None:
229+
self.creation_progress = Progress(
230+
TextColumn(
231+
f"[bold blue]Creating new model versions for '{environment}'", justify="right"
232+
),
233+
BarColumn(bar_width=40),
234+
"[progress.percentage]{task.percentage:>3.1f}%",
235+
"•",
236+
srich.BatchColumn(),
237+
"•",
238+
TimeElapsedColumn(),
239+
console=self.console,
240+
)
241+
242+
self.creation_progress.start()
243+
self.creation_task = self.creation_progress.add_task(
244+
f"Creating new model versions for {environment}...",
245+
total=total_tasks,
246+
)
247+
248+
def update_creation_progress(self, num_tasks: int) -> None:
249+
"""Update the snapshot creation progress."""
250+
if self.creation_progress is not None and self.creation_task is not None:
251+
self.creation_progress.update(self.creation_task, refresh=True, advance=num_tasks)
252+
253+
def stop_creation_progress(self, success: bool = True) -> None:
254+
"""Stop the snapshot creation progress."""
255+
self.creation_task = None
256+
if self.creation_progress is not None:
257+
self.creation_progress.stop()
258+
self.creation_progress = None
259+
if success:
260+
self.log_success("All model versions have been created successfully")
261+
212262
def start_promotion_progress(self, environment: str, total_tasks: int) -> None:
213-
"""Indicates that a new promotion progress has begun."""
263+
"""Indicates that a new snapshot promotion progress has begun."""
214264
if self.promotion_progress is None:
215265
self.promotion_progress = Progress(
216266
TextColumn(f"[bold blue]Virtually Updating '{environment}'", justify="right"),
@@ -228,12 +278,12 @@ def start_promotion_progress(self, environment: str, total_tasks: int) -> None:
228278
)
229279

230280
def update_promotion_progress(self, num_tasks: int) -> None:
231-
"""Update promotion progress."""
281+
"""Update the snapshot promotion progress."""
232282
if self.promotion_progress is not None and self.promotion_task is not None:
233283
self.promotion_progress.update(self.promotion_task, refresh=True, advance=num_tasks)
234284

235285
def stop_promotion_progress(self, success: bool = True) -> None:
236-
"""Stop the promotion progress"""
286+
"""Stop the snapshot promotion progress."""
237287
self.promotion_task = None
238288
if self.promotion_progress is not None:
239289
self.promotion_progress.stop()
@@ -262,7 +312,7 @@ def start_migration_progress(self, total_tasks: int) -> None:
262312
)
263313

264314
def update_migration_progress(self, num_tasks: int) -> None:
265-
"""Update migration progress."""
315+
"""Update the migration progress."""
266316
if self.migration_progress is not None and self.migration_task is not None:
267317
self.migration_progress.update(self.migration_task, refresh=True, advance=num_tasks)
268318

@@ -361,7 +411,7 @@ def _show_options_after_categorization(self, plan: Plan, auto_apply: bool) -> No
361411
self._prompt_promote(plan)
362412

363413
def _prompt_categorize(self, plan: Plan, auto_apply: bool) -> None:
364-
"""Get the user's change category for the directly modified models"""
414+
"""Get the user's change category for the directly modified models."""
365415
self.show_model_difference_summary(plan.context_diff)
366416

367417
self._show_categorized_snapshots(plan)
@@ -398,7 +448,7 @@ def _show_categorized_snapshots(self, plan: Plan) -> None:
398448
self._print(tree)
399449

400450
def _show_missing_dates(self, plan: Plan) -> None:
401-
"""Displays the models with missing dates"""
451+
"""Displays the models with missing dates."""
402452
if not plan.missing_intervals:
403453
return
404454
backfill = Tree("[bold]Models needing backfill (missing dates):")
@@ -608,13 +658,14 @@ def _snapshot_change_choices(
608658

609659

610660
def add_to_layout_widget(target_widget: LayoutWidget, *widgets: widgets.Widget) -> LayoutWidget:
611-
"""Helper function to add a widget to a layout widget
661+
"""Helper function to add a widget to a layout widget.
662+
612663
Args:
613-
target_widget: The layout widget to add the other widget(s) to
614-
*widgets: The widgets to add to the layout widget
664+
target_widget: The layout widget to add the other widget(s) to.
665+
*widgets: The widgets to add to the layout widget.
615666
616667
Returns:
617-
The layout widget with the children added
668+
The layout widget with the children added.
618669
"""
619670
target_widget.children += tuple(widgets)
620671
return target_widget
@@ -914,7 +965,7 @@ def show_model_difference_summary(
914965
"""Shows a summary of the differences.
915966
916967
Args:
917-
context_diff: The context diff to use to print the summary
968+
context_diff: The context diff to use to print the summary.
918969
detailed: Show the actual SQL differences if True.
919970
"""
920971
if context_diff.is_new_environment:
@@ -972,7 +1023,7 @@ def show_model_difference_summary(
9721023
self._print("\n")
9731024

9741025
def _show_missing_dates(self, plan: Plan) -> None:
975-
"""Displays the models with missing dates"""
1026+
"""Displays the models with missing dates."""
9761027
if not plan.missing_intervals:
9771028
return
9781029
self._print("**Models needing backfill (missing dates):**\n\n")
@@ -1048,17 +1099,17 @@ def _confirm(self, message: str, **kwargs: t.Any) -> bool:
10481099
self._print(message)
10491100
return super()._confirm("", **kwargs)
10501101

1051-
def start_snapshot_progress(
1102+
def start_evaluation_progress(
10521103
self, snapshot: Snapshot, total_batches: int, environment: str
10531104
) -> None:
1054-
"""Indicates that a new load progress has begun."""
1105+
"""Indicates that a new snapshot evaluation progress has begun."""
10551106
if not self.evaluation_batch_progress.get(snapshot.name):
10561107
view_name = snapshot.qualified_view_name.for_environment(environment)
10571108
self.evaluation_batch_progress[snapshot.name] = (view_name, 0, total_batches)
10581109
print(f"Starting '{view_name}', Total batches: {total_batches}")
10591110

1060-
def update_snapshot_progress(self, snapshot_name: str, num_batches: int) -> None:
1061-
"""Update snapshot progress."""
1111+
def update_evaluation_progress(self, snapshot_name: str, num_batches: int) -> None:
1112+
"""Update the snapshot evaluation progress."""
10621113
view_name, loaded_batches, total_batches = self.evaluation_batch_progress[snapshot_name]
10631114
loaded_batches += 1
10641115
self.evaluation_batch_progress[snapshot_name] = (view_name, loaded_batches, total_batches)
@@ -1072,26 +1123,26 @@ def update_snapshot_progress(self, snapshot_name: str, num_batches: int) -> None
10721123
total = len(self.evaluation_batch_progress)
10731124
print(f"Completed Loading {total_finished_loading}/{total} Models")
10741125

1075-
def stop_snapshot_progress(self, success: bool = True) -> None:
1076-
"""Stop the load progress"""
1126+
def stop_evaluation_progress(self, success: bool = True) -> None:
1127+
"""Stop the snapshot evaluation progress."""
10771128
self.evaluation_batch_progress = {}
10781129
print(f"Loading {'succeeded' if success else 'failed'}")
10791130

10801131
def start_promotion_progress(self, environment: str, total_tasks: int) -> None:
1081-
"""Indicates that a new promotion progress has begun."""
1132+
"""Indicates that a new snapshot promotion progress has begun."""
10821133
self.promotion_status = (0, total_tasks)
10831134
print(f"Virtually Updating '{environment}'")
10841135

10851136
def update_promotion_progress(self, num_tasks: int) -> None:
1086-
"""Update promotion progress."""
1137+
"""Update the snapshot promotion progress."""
10871138
num_promotions, total_promotions = self.promotion_status
10881139
num_promotions += num_tasks
10891140
self.promotion_status = (num_promotions, total_promotions)
10901141
if num_promotions % 5 == 0:
10911142
print(f"Virtually Updated {num_promotions}/{total_promotions}")
10921143

10931144
def stop_promotion_progress(self, success: bool = True) -> None:
1094-
"""Stop the promotion progress"""
1145+
"""Stop the snapshot promotion progress."""
10951146
print(f"Virtual Update {'succeeded' if success else 'failed'}")
10961147

10971148

sqlmesh/core/context.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,9 @@ def __init__(
250250
self._test_engine_adapter = test_connection_config.create_engine_adapter()
251251

252252
self.snapshot_evaluator = SnapshotEvaluator(
253-
self.engine_adapter, ddl_concurrent_tasks=self.concurrent_tasks
253+
self.engine_adapter,
254+
ddl_concurrent_tasks=self.concurrent_tasks,
255+
console=self.console,
254256
)
255257

256258
self._provided_state_sync: t.Optional[StateSync] = state_sync

sqlmesh/core/plan/evaluator.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,24 @@ def _push(self, plan: Plan) -> None:
116116
Args:
117117
plan: The plan to source snapshots from.
118118
"""
119-
self.snapshot_evaluator.create(
120-
plan.new_snapshots, {s.snapshot_id: s for s in plan.snapshots}
121-
)
119+
snapshot_id_to_snapshot = {s.snapshot_id: s for s in plan.snapshots}
120+
121+
self.console.start_creation_progress(plan.environment.name, len(snapshot_id_to_snapshot))
122+
123+
def on_complete(snapshot: SnapshotInfoLike) -> None:
124+
self.console.update_creation_progress(1)
125+
126+
completed = False
127+
try:
128+
self.snapshot_evaluator.create(
129+
plan.new_snapshots,
130+
snapshot_id_to_snapshot,
131+
on_complete=on_complete,
132+
)
133+
completed = True
134+
finally:
135+
self.console.stop_creation_progress(success=completed)
136+
122137
self.state_sync.push_snapshots(plan.new_snapshots)
123138

124139
def _promote(self, plan: Plan) -> None:

sqlmesh/core/scheduler.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def __init__(
6363
self.snapshot_per_version = _resolve_one_snapshot_per_version(self.snapshots.values())
6464
self.snapshot_evaluator = snapshot_evaluator
6565
self.max_workers = max_workers
66-
self.console: Console = console or get_console()
66+
self.console = console or get_console()
6767
self.notification_target_manager = (
6868
notification_target_manager or NotificationTargetManager()
6969
)
@@ -167,7 +167,7 @@ def evaluate(
167167
)
168168
raise e
169169
self.state_sync.add_interval(snapshot, start, end, is_dev=is_dev)
170-
self.console.update_snapshot_progress(snapshot.name, 1)
170+
self.console.update_evaluation_progress(snapshot.name, 1)
171171

172172
def run(
173173
self,
@@ -203,7 +203,7 @@ def run(
203203
continue
204204
visited.add(snapshot)
205205
intervals = batches[snapshot]
206-
self.console.start_snapshot_progress(snapshot, len(intervals), environment)
206+
self.console.start_evaluation_progress(snapshot, len(intervals), environment)
207207

208208
def evaluate_node(node: SchedulingUnit) -> None:
209209
assert latest
@@ -221,7 +221,7 @@ def evaluate_node(node: SchedulingUnit) -> None:
221221
finally:
222222
self.state_sync.recycle()
223223

224-
self.console.stop_snapshot_progress(success=not errors)
224+
self.console.stop_evaluation_progress(success=not errors)
225225

226226
for error in errors:
227227
sid = error.node[0]

0 commit comments

Comments
 (0)