Skip to content

Commit f3b3f59

Browse files
authored
Fix: Use the interval_unit instead of cron when normalizing intervals during evaluation in Airflow (#1612)
1 parent aab5225 commit f3b3f59

2 files changed

Lines changed: 6 additions & 5 deletions

File tree

sqlmesh/schedulers/airflow/operators/hwm_sensor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ def _compute_target_high_water_mark(
6565
self, dag_run: DagRun, target_snapshot: Snapshot
6666
) -> datetime:
6767
target_date = to_datetime(dag_run.data_interval_end)
68-
target_prev = to_datetime(target_snapshot.node.cron_floor(target_date))
69-
this_prev = to_datetime(self.this_snapshot.node.cron_floor(target_date))
68+
target_prev = to_datetime(target_snapshot.node.interval_unit.cron_floor(target_date))
69+
this_prev = to_datetime(self.this_snapshot.node.interval_unit.cron_floor(target_date))
7070
return min(target_prev, this_prev)
7171

7272

sqlmesh/schedulers/airflow/operators/targets.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import abc
22
import typing as t
3-
from datetime import datetime
43

54
from airflow.exceptions import AirflowSkipException
65
from airflow.utils.context import Context
@@ -154,14 +153,16 @@ def _get_start(self, context: Context) -> TimeLike:
154153
if self.start:
155154
return self.start
156155

157-
start = t.cast(datetime, context["dag_run"].data_interval_start)
156+
start = self.snapshot.node.interval_unit.cron_floor(context["dag_run"].data_interval_start)
158157
if not self.snapshot.is_model:
159158
return start
160159

161160
return self.snapshot.model.lookback_start(start)
162161

163162
def _get_end(self, context: Context) -> TimeLike:
164-
return self.end or context["dag_run"].data_interval_end
163+
return self.end or self.snapshot.node.interval_unit.cron_floor(
164+
context["dag_run"].data_interval_end
165+
)
165166

166167
def _get_execution_time(self, context: Context) -> TimeLike:
167168
return self.execution_time or context["dag_run"].logical_date

0 commit comments

Comments
 (0)