Skip to content

Commit 1fbd17f

Browse files
authored
Chore: Report duration for each finished snapshot batch during evaluation (#1571)
1 parent c1f2477 commit 1fbd17f

5 files changed

Lines changed: 93 additions & 38 deletions

File tree

sqlmesh/core/console.py

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
7171
"""Starts the snapshot evaluation progress."""
7272

7373
@abc.abstractmethod
74-
def update_snapshot_evaluation_progress(self, snapshot: Snapshot, num_batches: int) -> None:
74+
def update_snapshot_evaluation_progress(
75+
self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
76+
) -> None:
7577
"""Updates the snapshot evaluation progress."""
7678

7779
@abc.abstractmethod
@@ -263,19 +265,29 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
263265
total=self.evaluation_model_batches[snapshot],
264266
)
265267

266-
def update_snapshot_evaluation_progress(self, snapshot: Snapshot, num_batches: int) -> None:
268+
def update_snapshot_evaluation_progress(
269+
self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
270+
) -> None:
267271
"""Update the snapshot evaluation progress."""
268-
if self.evaluation_total_progress and self.evaluation_model_progress:
272+
if (
273+
self.evaluation_total_progress
274+
and self.evaluation_model_progress
275+
and self.evaluation_progress_live
276+
):
277+
total_batches = self.evaluation_model_batches[snapshot]
278+
279+
if duration_ms:
280+
self.evaluation_progress_live.console.print(
281+
f"[{batch_idx + 1}/{total_batches}] {snapshot.name} [green]finished[/green] in {(duration_ms / 1000.0):.2f}s"
282+
)
283+
269284
self.evaluation_total_progress.update(
270-
self.evaluation_total_task or TaskID(0), refresh=True, advance=num_batches
285+
self.evaluation_total_task or TaskID(0), refresh=True, advance=1
271286
)
272287

273288
model_task_id = self.evaluation_model_tasks[snapshot.name]
274-
self.evaluation_model_progress.update(model_task_id, refresh=True, advance=num_batches)
275-
if (
276-
self.evaluation_model_progress._tasks[model_task_id].completed
277-
>= self.evaluation_model_batches[snapshot]
278-
):
289+
self.evaluation_model_progress.update(model_task_id, refresh=True, advance=1)
290+
if self.evaluation_model_progress._tasks[model_task_id].completed >= total_batches:
279291
self.evaluation_model_progress.remove_task(model_task_id)
280292

281293
def stop_evaluation_progress(self, success: bool = True) -> None:
@@ -1318,11 +1330,13 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
13181330
self.evaluation_batch_progress[snapshot.name] = (view_name, 0)
13191331
print(f"Starting '{view_name}', Total batches: {self.evaluation_batches[snapshot]}")
13201332

1321-
def update_snapshot_evaluation_progress(self, snapshot: Snapshot, num_batches: int) -> None:
1333+
def update_snapshot_evaluation_progress(
1334+
self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
1335+
) -> None:
13221336
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.name]
13231337
total_batches = self.evaluation_batches[snapshot]
13241338

1325-
loaded_batches += num_batches
1339+
loaded_batches += 1
13261340
self.evaluation_batch_progress[snapshot.name] = (view_name, loaded_batches)
13271341

13281342
finished_loading = loaded_batches == total_batches

sqlmesh/core/plan/evaluator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,8 @@ def evaluate(self, plan: Plan) -> None:
263263
if not plan_application_succeeded:
264264
raise SQLMeshError("Plan application failed.")
265265

266+
self.console.log_success("The plan has been applied successfully")
267+
266268
@property
267269
def client(self) -> BaseAirflowClient:
268270
raise NotImplementedError

sqlmesh/core/scheduler.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,20 @@
2323
from sqlmesh.utils import format_exception
2424
from sqlmesh.utils.concurrency import concurrent_apply_to_dag
2525
from sqlmesh.utils.dag import DAG
26-
from sqlmesh.utils.date import TimeLike, now, to_datetime, validate_date_range
26+
from sqlmesh.utils.date import (
27+
TimeLike,
28+
now,
29+
now_timestamp,
30+
to_datetime,
31+
validate_date_range,
32+
)
2733
from sqlmesh.utils.errors import AuditError, SQLMeshError
2834

2935
logger = logging.getLogger(__name__)
3036
Interval = t.Tuple[datetime, datetime]
3137
Batch = t.List[Interval]
3238
SnapshotToBatches = t.Dict[Snapshot, Batch]
33-
SchedulingUnit = t.Tuple[Snapshot, Interval]
39+
SchedulingUnit = t.Tuple[Snapshot, t.Tuple[Interval, int]]
3440

3541

3642
class Scheduler:
@@ -246,12 +252,20 @@ def run(
246252

247253
def evaluate_node(node: SchedulingUnit) -> None:
248254
assert execution_time
249-
snapshot, (start, end) = node
255+
256+
snapshot, ((start, end), batch_idx) = node
250257
self.console.start_snapshot_evaluation_progress(snapshot)
258+
259+
execution_start_ts = now_timestamp()
260+
evaluation_duration_ms: t.Optional[int] = None
261+
251262
try:
252263
self.evaluate(snapshot, start, end, execution_time, is_dev=is_dev)
264+
evaluation_duration_ms = now_timestamp() - execution_start_ts
253265
finally:
254-
self.console.update_snapshot_evaluation_progress(snapshot, 1)
266+
self.console.update_snapshot_evaluation_progress(
267+
snapshot, batch_idx, evaluation_duration_ms
268+
)
255269

256270
try:
257271
with self.snapshot_evaluator.concurrent_context():
@@ -297,23 +311,25 @@ def _dag(self, batches: SnapshotToBatches) -> DAG[SchedulingUnit]:
297311
if not intervals:
298312
continue
299313
upstream_dependencies = [
300-
(self.snapshots[p_sid], interval)
314+
(self.snapshots[p_sid], (interval, i))
301315
for p_sid in snapshot.parents
302316
if p_sid in self.snapshots
303-
for interval in intervals_per_snapshot_version.get(
304-
(
305-
self.snapshots[p_sid].name,
306-
self.snapshots[p_sid].version_get_or_generate(),
307-
),
308-
[],
317+
for i, interval in enumerate(
318+
intervals_per_snapshot_version.get(
319+
(
320+
self.snapshots[p_sid].name,
321+
self.snapshots[p_sid].version_get_or_generate(),
322+
),
323+
[],
324+
)
309325
)
310326
]
311327
for i, interval in enumerate(intervals):
312-
dag.add((snapshot, interval), upstream_dependencies)
328+
dag.add((snapshot, (interval, i)), upstream_dependencies)
313329
if snapshot.depends_on_past:
314330
dag.add(
315-
(snapshot, interval),
316-
[(snapshot, _interval) for _interval in intervals[:i]],
331+
(snapshot, (interval, i)),
332+
[(snapshot, (_interval, _i)) for _i, _interval in enumerate(intervals[:i])],
317333
)
318334
return dag
319335

tests/core/test_scheduler.py

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,10 @@ def test_incremental_by_unique_key_kind_dag(mocker: MockerFixture, make_snapshot
124124
dag = scheduler._dag(batches)
125125
assert dag.graph == {
126126
# Depends on no one
127-
(unique_by_key_snapshot, (to_datetime("2023-01-01"), to_datetime("2023-01-07"))): set(),
127+
(
128+
unique_by_key_snapshot,
129+
((to_datetime("2023-01-01"), to_datetime("2023-01-07")), 0),
130+
): set(),
128131
}
129132
mock_state_sync.refresh_snapshot_intervals.assert_called_once()
130133

@@ -161,17 +164,35 @@ def test_incremental_time_self_reference_dag(mocker: MockerFixture, make_snapsho
161164

162165
assert dag.graph == {
163166
# Only run one day at a time and each day relies on the previous days
164-
(incremental_self_snapshot, (to_datetime("2023-01-01"), to_datetime("2023-01-02"))): set(),
165-
(incremental_self_snapshot, (to_datetime("2023-01-03"), to_datetime("2023-01-04"))): {
166-
(incremental_self_snapshot, (to_datetime("2023-01-01"), to_datetime("2023-01-02")))
167+
(
168+
incremental_self_snapshot,
169+
((to_datetime("2023-01-01"), to_datetime("2023-01-02")), 0),
170+
): set(),
171+
(incremental_self_snapshot, ((to_datetime("2023-01-03"), to_datetime("2023-01-04")), 1)): {
172+
(incremental_self_snapshot, ((to_datetime("2023-01-01"), to_datetime("2023-01-02")), 0))
167173
},
168-
(incremental_self_snapshot, (to_datetime("2023-01-04"), to_datetime("2023-01-05"))): {
169-
(incremental_self_snapshot, (to_datetime("2023-01-01"), to_datetime("2023-01-02"))),
170-
(incremental_self_snapshot, (to_datetime("2023-01-03"), to_datetime("2023-01-04"))),
174+
(incremental_self_snapshot, ((to_datetime("2023-01-04"), to_datetime("2023-01-05")), 2)): {
175+
(
176+
incremental_self_snapshot,
177+
((to_datetime("2023-01-01"), to_datetime("2023-01-02")), 0),
178+
),
179+
(
180+
incremental_self_snapshot,
181+
((to_datetime("2023-01-03"), to_datetime("2023-01-04")), 1),
182+
),
171183
},
172-
(incremental_self_snapshot, (to_datetime("2023-01-06"), to_datetime("2023-01-07"))): {
173-
(incremental_self_snapshot, (to_datetime("2023-01-01"), to_datetime("2023-01-02"))),
174-
(incremental_self_snapshot, (to_datetime("2023-01-03"), to_datetime("2023-01-04"))),
175-
(incremental_self_snapshot, (to_datetime("2023-01-04"), to_datetime("2023-01-05"))),
184+
(incremental_self_snapshot, ((to_datetime("2023-01-06"), to_datetime("2023-01-07")), 3)): {
185+
(
186+
incremental_self_snapshot,
187+
((to_datetime("2023-01-01"), to_datetime("2023-01-02")), 0),
188+
),
189+
(
190+
incremental_self_snapshot,
191+
((to_datetime("2023-01-03"), to_datetime("2023-01-04")), 1),
192+
),
193+
(
194+
incremental_self_snapshot,
195+
((to_datetime("2023-01-04"), to_datetime("2023-01-05")), 2),
196+
),
176197
},
177198
}

web/server/console.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,12 @@ def start_evaluation_progress(
5555
def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
5656
pass
5757

58-
def update_snapshot_evaluation_progress(self, snapshot: Snapshot, num_batches: int) -> None:
58+
def update_snapshot_evaluation_progress(
59+
self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
60+
) -> None:
5961
"""Update snapshot evaluation progress."""
6062
if self.current_task_status:
61-
self.current_task_status[snapshot.name]["completed"] += num_batches
63+
self.current_task_status[snapshot.name]["completed"] += 1
6264
if (
6365
self.current_task_status[snapshot.name]["completed"]
6466
>= self.current_task_status[snapshot.name]["total"]

0 commit comments

Comments
 (0)