Skip to content

Commit a6e98d0

Browse files
github-actions[bot]leossantosLee-Wjscheffl
authored andcommitted
[v3-2-test] fix(scheduler): skip asset-triggered dags without SerializedDagModel (#64322) (#64738)
* fix(scheduler): skip asset-triggered dags without SerializedDagModel in dags_needing_dagruns Remove those dag_ids from the in-memory candidate set until serialization exists; retain AssetDagRunQueue rows and emit DEBUG logs. Add unit tests and a bugfix newsfragment. * fix(scheduler): prevent premature asset-triggered DagRuns when SerializedDagModel is unavailable * test(dag): persist DagModel before AssetDagRunQueue in unit tests Split DagModel and AssetDagRunQueue inserts and flush after DagModel so foreign-key order matches production DB constraints in TestDagModel. * Apply suggestions from code review * refactor(dag): clarify ADRQ skip log and condense serialized-DAG guard Combine the missing-from-serialized set check with a walrus assignment and improve the debug message when DagRun creation is skipped for DAGs with queued asset events but no SerializedDagModel row. * test(models): align caplog assertions with updated serialized dag warnings * test(dag): align ADRQ missing-serialized log assertion with message text * Apply suggestion from @jscheffl * chore(newsfragments): remove 64322.bugfix.rst --------- (cherry picked from commit b91394a) Co-authored-by: Leonardo Soares <leoss33@outlook.com.br> Co-authored-by: Wei Lee <weilee.rx@gmail.com> Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
1 parent c49a99b commit a6e98d0

2 files changed

Lines changed: 143 additions & 1 deletion

File tree

airflow-core/src/airflow/models/dag.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,10 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Any, dict[str, datetime
630630
you should ensure that any scheduling decisions are made in a single transaction -- as soon as the
631631
transaction is committed it will be unlocked.
632632
633+
For asset-triggered scheduling, Dags that have ``AssetDagRunQueue`` rows but no matching
634+
``SerializedDagModel`` row are omitted from ``triggered_date_by_dag`` until serialization exists;
635+
ADRQs are **not** deleted here so the scheduler can re-evaluate on a later run.
636+
633637
:meta private:
634638
"""
635639
from airflow.models.serialized_dag import SerializedDagModel
@@ -676,6 +680,16 @@ def dag_ready(dag_id: str, cond: SerializedAssetBase, statuses: dict[UKey, bool]
676680
for dag_id, adrqs in adrq_by_dag.items()
677681
}
678682
ser_dags = SerializedDagModel.get_latest_serialized_dags(dag_ids=list(dag_statuses), session=session)
683+
ser_dag_ids = {ser_dag.dag_id for ser_dag in ser_dags}
684+
if missing_from_serialized := set(adrq_by_dag.keys()) - ser_dag_ids:
685+
log.info(
686+
"Dags have queued asset events (ADRQ), but are not found in the serialized_dag table."
687+
" — skipping Dag run creation: %s",
688+
sorted(missing_from_serialized),
689+
)
690+
for dag_id in missing_from_serialized:
691+
del adrq_by_dag[dag_id]
692+
del dag_statuses[dag_id]
679693
for ser_dag in ser_dags:
680694
dag_id = ser_dag.dag_id
681695
statuses = dag_statuses[dag_id]

airflow-core/tests/unit/models/test_dag.py

Lines changed: 129 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import pendulum
3434
import pytest
3535
import time_machine
36-
from sqlalchemy import delete, inspect, select, update
36+
from sqlalchemy import delete, func, inspect, select, update
3737

3838
from airflow import settings
3939
from airflow._shared.module_loading import qualname
@@ -2047,6 +2047,134 @@ def test_dags_needing_dagruns_assets(self, dag_maker, session):
20472047
dag_models = query.all()
20482048
assert dag_models == [dag_model]
20492049

2050+
def test_dags_needing_dagruns_skips_adrq_when_serialized_dag_missing(
2051+
self, session, caplog, testing_dag_bundle
2052+
):
2053+
"""ADRQ rows for a Dag without SerializedDagModel must be skipped (no triggered_date_by_dag).
2054+
2055+
Rows must remain in ``asset_dag_run_queue`` so the scheduler can re-evaluate on a later run once
2056+
``SerializedDagModel`` exists (``dags_needing_dagruns`` only drops them from the in-memory
2057+
candidate set, it does not delete ORM rows).
2058+
"""
2059+
orphan_dag_id = "adrq_no_serialized_dag"
2060+
orphan_uri = "test://asset_for_orphan_adrq"
2061+
session.add(AssetModel(uri=orphan_uri))
2062+
session.flush()
2063+
asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri == orphan_uri))
2064+
2065+
dag_model = DagModel(
2066+
dag_id=orphan_dag_id,
2067+
bundle_name="testing",
2068+
max_active_tasks=1,
2069+
has_task_concurrency_limits=False,
2070+
max_consecutive_failed_dag_runs=0,
2071+
next_dagrun=timezone.datetime(2038, 1, 1),
2072+
next_dagrun_create_after=timezone.datetime(2038, 1, 2),
2073+
is_stale=False,
2074+
has_import_errors=False,
2075+
is_paused=False,
2076+
asset_expression={"any": [{"uri": orphan_uri}]},
2077+
)
2078+
session.add(dag_model)
2079+
session.flush()
2080+
2081+
session.add(AssetDagRunQueue(asset_id=asset_id, target_dag_id=orphan_dag_id))
2082+
session.flush()
2083+
2084+
with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
2085+
_query, triggered_date_by_dag = DagModel.dags_needing_dagruns(session)
2086+
2087+
assert orphan_dag_id not in triggered_date_by_dag
2088+
assert (
2089+
"Dags have queued asset events (ADRQ), but are not found in the serialized_dag table."
2090+
in caplog.text
2091+
)
2092+
assert orphan_dag_id in caplog.text
2093+
assert (
2094+
session.scalar(
2095+
select(func.count())
2096+
.select_from(AssetDagRunQueue)
2097+
.where(AssetDagRunQueue.target_dag_id == orphan_dag_id)
2098+
)
2099+
== 1
2100+
)
2101+
2102+
def test_dags_needing_dagruns_missing_serialized_debug_lists_sorted_dag_ids(
2103+
self, session, caplog, testing_dag_bundle
2104+
):
2105+
"""When multiple dags lack SerializedDagModel, the debug log lists dag_ids sorted."""
2106+
session.add_all(
2107+
[
2108+
AssetModel(uri="test://ds_ghost_z"),
2109+
AssetModel(uri="test://ds_ghost_a"),
2110+
]
2111+
)
2112+
session.flush()
2113+
id_z = session.scalar(select(AssetModel.id).where(AssetModel.uri == "test://ds_ghost_z"))
2114+
id_a = session.scalar(select(AssetModel.id).where(AssetModel.uri == "test://ds_ghost_a"))
2115+
far = timezone.datetime(2038, 1, 1)
2116+
far_after = timezone.datetime(2038, 1, 2)
2117+
session.add_all(
2118+
[
2119+
DagModel(
2120+
dag_id="ghost_z",
2121+
bundle_name="testing",
2122+
max_active_tasks=1,
2123+
has_task_concurrency_limits=False,
2124+
max_consecutive_failed_dag_runs=0,
2125+
next_dagrun=far,
2126+
next_dagrun_create_after=far_after,
2127+
is_stale=False,
2128+
has_import_errors=False,
2129+
is_paused=False,
2130+
asset_expression={"any": [{"uri": "test://ds_ghost_z"}]},
2131+
),
2132+
DagModel(
2133+
dag_id="ghost_a",
2134+
bundle_name="testing",
2135+
max_active_tasks=1,
2136+
has_task_concurrency_limits=False,
2137+
max_consecutive_failed_dag_runs=0,
2138+
next_dagrun=far,
2139+
next_dagrun_create_after=far_after,
2140+
is_stale=False,
2141+
has_import_errors=False,
2142+
is_paused=False,
2143+
asset_expression={"any": [{"uri": "test://ds_ghost_a"}]},
2144+
),
2145+
]
2146+
)
2147+
session.flush()
2148+
2149+
session.add_all(
2150+
[
2151+
AssetDagRunQueue(asset_id=id_z, target_dag_id="ghost_z"),
2152+
AssetDagRunQueue(asset_id=id_a, target_dag_id="ghost_a"),
2153+
]
2154+
)
2155+
session.flush()
2156+
2157+
with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
2158+
_query, triggered_date_by_dag = DagModel.dags_needing_dagruns(session)
2159+
2160+
assert "ghost_a" not in triggered_date_by_dag
2161+
assert "ghost_z" not in triggered_date_by_dag
2162+
msg = next(
2163+
r.message
2164+
for r in caplog.records
2165+
if "Dags have queued asset events (ADRQ), but are not found in the serialized_dag table."
2166+
in r.message
2167+
)
2168+
assert msg.index("ghost_a") < msg.index("ghost_z")
2169+
assert (
2170+
session.scalar(
2171+
select(func.count())
2172+
.select_from(AssetDagRunQueue)
2173+
.where(AssetDagRunQueue.target_dag_id.in_(("ghost_a", "ghost_z")))
2174+
)
2175+
== 2
2176+
)
2177+
20502178
def test_dags_needing_dagruns_query_count(self, dag_maker, session):
20512179
"""Test that dags_needing_dagruns avoids N+1 on adrq.asset access."""
20522180
num_assets = 10

0 commit comments

Comments
 (0)