Skip to content

Commit cb09e65

Browse files
committed
fix(scheduler): skip asset-triggered dags without SerializedDagModel in dags_needing_dagruns
Remove those dag_ids from the in-memory candidate set until serialization exists; retain AssetDagRunQueue rows and emit DEBUG logs. Add unit tests and a bugfix newsfragment.
1 parent 95fc11e commit cb09e65

2 files changed

Lines changed: 129 additions & 1 deletion

File tree

airflow-core/src/airflow/models/dag.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,10 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Any, dict[str, datetime
631631
you should ensure that any scheduling decisions are made in a single transaction -- as soon as the
632632
transaction is committed it will be unlocked.
633633
634+
For asset-triggered scheduling, DAGs that have ``AssetDagRunQueue`` rows but no matching
635+
``SerializedDagModel`` row are omitted from ``triggered_date_by_dag`` until serialization exists;
636+
queue rows are **not** deleted here so the scheduler can re-evaluate on a later run.
637+
634638
:meta private:
635639
"""
636640
from airflow.models.serialized_dag import SerializedDagModel
@@ -677,6 +681,16 @@ def dag_ready(dag_id: str, cond: SerializedAssetBase, statuses: dict[UKey, bool]
677681
for dag_id, adrqs in adrq_by_dag.items()
678682
}
679683
ser_dags = SerializedDagModel.get_latest_serialized_dags(dag_ids=list(dag_statuses), session=session)
684+
ser_dag_ids = {ser_dag.dag_id for ser_dag in ser_dags}
685+
missing_from_serialized = set(adrq_by_dag.keys()) - ser_dag_ids
686+
if missing_from_serialized:
687+
log.debug(
688+
"DAGs in ADRQ but missing SerializedDagModel (skipping — condition cannot be evaluated): %s",
689+
sorted(missing_from_serialized),
690+
)
691+
for dag_id in missing_from_serialized:
692+
del adrq_by_dag[dag_id]
693+
del dag_statuses[dag_id]
680694
for ser_dag in ser_dags:
681695
dag_id = ser_dag.dag_id
682696
statuses = dag_statuses[dag_id]

airflow-core/tests/unit/models/test_dag.py

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import pendulum
3434
import pytest
3535
import time_machine
36-
from sqlalchemy import delete, inspect, select, update
36+
from sqlalchemy import delete, func, inspect, select, update
3737

3838
from airflow import settings
3939
from airflow._shared.module_loading import qualname
@@ -2047,6 +2047,120 @@ def test_dags_needing_dagruns_assets(self, dag_maker, session):
20472047
dag_models = query.all()
20482048
assert dag_models == [dag_model]
20492049

2050+
def test_dags_needing_dagruns_skips_adrq_when_serialized_dag_missing(
2051+
self, session, caplog, testing_dag_bundle
2052+
):
2053+
"""ADRQ rows for a dag_id without SerializedDagModel must be skipped (no triggered_date_by_dag).
2054+
2055+
Rows must remain in ``asset_dag_run_queue`` so the scheduler can re-evaluate on a later run once
2056+
``SerializedDagModel`` exists (``dags_needing_dagruns`` only drops them from the in-memory
2057+
candidate set, it does not delete ORM rows).
2058+
"""
2059+
orphan_dag_id = "adrq_no_serialized_dag"
2060+
orphan_uri = "test://asset_for_orphan_adrq"
2061+
session.add(AssetModel(uri=orphan_uri))
2062+
session.flush()
2063+
asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri == orphan_uri))
2064+
session.add(
2065+
DagModel(
2066+
dag_id=orphan_dag_id,
2067+
bundle_name="testing",
2068+
max_active_tasks=1,
2069+
has_task_concurrency_limits=False,
2070+
max_consecutive_failed_dag_runs=0,
2071+
next_dagrun=timezone.datetime(2038, 1, 1),
2072+
next_dagrun_create_after=timezone.datetime(2038, 1, 2),
2073+
is_stale=False,
2074+
has_import_errors=False,
2075+
is_paused=False,
2076+
asset_expression={"any": [{"uri": orphan_uri}]},
2077+
)
2078+
)
2079+
session.add(AssetDagRunQueue(asset_id=asset_id, target_dag_id=orphan_dag_id))
2080+
session.flush()
2081+
2082+
with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
2083+
_query, triggered_date_by_dag = DagModel.dags_needing_dagruns(session)
2084+
2085+
assert orphan_dag_id not in triggered_date_by_dag
2086+
assert "DAGs in ADRQ but missing SerializedDagModel" in caplog.text
2087+
assert orphan_dag_id in caplog.text
2088+
assert (
2089+
session.scalar(
2090+
select(func.count())
2091+
.select_from(AssetDagRunQueue)
2092+
.where(AssetDagRunQueue.target_dag_id == orphan_dag_id)
2093+
)
2094+
== 1
2095+
)
2096+
2097+
def test_dags_needing_dagruns_missing_serialized_debug_lists_sorted_dag_ids(
2098+
self, session, caplog, testing_dag_bundle
2099+
):
2100+
"""When multiple dags lack SerializedDagModel, the debug log lists dag_ids sorted."""
2101+
session.add_all(
2102+
[
2103+
AssetModel(uri="test://ds_ghost_z"),
2104+
AssetModel(uri="test://ds_ghost_a"),
2105+
]
2106+
)
2107+
session.flush()
2108+
id_z = session.scalar(select(AssetModel.id).where(AssetModel.uri == "test://ds_ghost_z"))
2109+
id_a = session.scalar(select(AssetModel.id).where(AssetModel.uri == "test://ds_ghost_a"))
2110+
far = timezone.datetime(2038, 1, 1)
2111+
far_after = timezone.datetime(2038, 1, 2)
2112+
session.add_all(
2113+
[
2114+
DagModel(
2115+
dag_id="ghost_z",
2116+
bundle_name="testing",
2117+
max_active_tasks=1,
2118+
has_task_concurrency_limits=False,
2119+
max_consecutive_failed_dag_runs=0,
2120+
next_dagrun=far,
2121+
next_dagrun_create_after=far_after,
2122+
is_stale=False,
2123+
has_import_errors=False,
2124+
is_paused=False,
2125+
asset_expression={"any": [{"uri": "test://ds_ghost_z"}]},
2126+
),
2127+
DagModel(
2128+
dag_id="ghost_a",
2129+
bundle_name="testing",
2130+
max_active_tasks=1,
2131+
has_task_concurrency_limits=False,
2132+
max_consecutive_failed_dag_runs=0,
2133+
next_dagrun=far,
2134+
next_dagrun_create_after=far_after,
2135+
is_stale=False,
2136+
has_import_errors=False,
2137+
is_paused=False,
2138+
asset_expression={"any": [{"uri": "test://ds_ghost_a"}]},
2139+
),
2140+
AssetDagRunQueue(asset_id=id_z, target_dag_id="ghost_z"),
2141+
AssetDagRunQueue(asset_id=id_a, target_dag_id="ghost_a"),
2142+
]
2143+
)
2144+
session.flush()
2145+
2146+
with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
2147+
_query, triggered_date_by_dag = DagModel.dags_needing_dagruns(session)
2148+
2149+
assert "ghost_a" not in triggered_date_by_dag
2150+
assert "ghost_z" not in triggered_date_by_dag
2151+
msg = next(
2152+
r.message for r in caplog.records if "DAGs in ADRQ but missing SerializedDagModel" in r.message
2153+
)
2154+
assert msg.index("ghost_a") < msg.index("ghost_z")
2155+
assert (
2156+
session.scalar(
2157+
select(func.count())
2158+
.select_from(AssetDagRunQueue)
2159+
.where(AssetDagRunQueue.target_dag_id.in_(("ghost_a", "ghost_z")))
2160+
)
2161+
== 2
2162+
)
2163+
20502164
def test_dags_needing_dagruns_query_count(self, dag_maker, session):
20512165
"""Test that dags_needing_dagruns avoids N+1 on adrq.asset access."""
20522166
num_assets = 10

0 commit comments

Comments
 (0)