diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 6b3744db13894..741b28120df4b 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4069,6 +4069,11 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str, tuple[ This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query, you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked. + + For dataset-triggered scheduling, Dags that have ``DatasetDagRunQueue`` rows but no matching + ``SerializedDagModel`` row are omitted from the returned ``dataset_triggered_dag_info`` until + serialization exists; DDRQs are **not** deleted here so the scheduler can re-evaluate on a + later run. """ from airflow.models.serialized_dag import SerializedDagModel @@ -4094,13 +4099,33 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: ser_dags = session.scalars( select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys())) ).all() + ser_dag_ids = {s.dag_id for s in ser_dags} + missing_from_serialized = set(by_dag.keys()) - ser_dag_ids + if missing_from_serialized: + log.info( + "Dags have queued dataset events (DDRQs), but are not found in the serialized_dag table." + " — skipping Dag run creation: %s", + sorted(missing_from_serialized), + ) + for dag_id in missing_from_serialized: + del by_dag[dag_id] + del dag_statuses[dag_id] for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] + dataset_condition = ser_dag.dag.timetable.dataset_condition - if not dag_ready(dag_id, cond=ser_dag.dag.timetable.dataset_condition, statuses=statuses): + if not dag_ready(dag_id, cond=dataset_condition, statuses=statuses): del by_dag[dag_id] del dag_statuses[dag_id] + else: + log.debug( + "Dataset condition satisfied: dag_id=%s, condition=%s, ddrq_uris=%s, ddrq_count=%d", + dag_id, + dataset_condition, + sorted(statuses.keys()), + len(statuses), + ) del dag_statuses dataset_triggered_dag_info = {} for dag_id, records in by_dag.items(): diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 5f721b61d2691..e13da06ab1ab1 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3030,6 +3030,7 @@ def test_dags_needing_dagruns_datasets(self, dag_maker, session): max_active_runs=1, schedule=[dataset], start_date=pendulum.now().add(days=-2), + serialized=True, ) as dag: EmptyOperator(task_id="dummy") @@ -3064,6 +3065,115 @@ def test_dags_needing_dagruns_datasets(self, dag_maker, session): dag_models = query.all() assert dag_models == [dag_model] + def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, session, caplog): + """DDRQ rows for a Dag without SerializedDagModel must be skipped (no dataset_triggered info). + + Rows must remain in ``dataset_dag_run_queue`` so the scheduler can re-evaluate on a later + heartbeat once ``SerializedDagModel`` exists (``dags_needing_dagruns`` only drops them from + the in-memory candidate set, it does not delete ORM rows). + """ + orphan_dag_id = "ddr_q_no_serialized_dag" + session.add(DatasetModel(uri="dataset_for_orphan_ddrq")) + session.flush() + dataset_id = session.query(DatasetModel.id).filter_by(uri="dataset_for_orphan_ddrq").scalar() + session.add( + DagModel( + dag_id=orphan_dag_id, + max_active_tasks=1, + has_task_concurrency_limits=False, + next_dagrun=timezone.datetime(2038, 1, 1), + next_dagrun_create_after=timezone.datetime(2038, 1, 2), + is_active=True, + has_import_errors=False, + is_paused=False, + ) + ) + session.flush() + session.add(DatasetDagRunQueue(dataset_id=dataset_id, target_dag_id=orphan_dag_id)) + session.flush() + + with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"): + _query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session) + + assert orphan_dag_id not in dataset_triggered_dag_info + assert ( + "Dags have queued dataset events (DDRQs), but are not found in the serialized_dag table." + in caplog.text + ) + assert orphan_dag_id in caplog.text + assert ( + session.query(DatasetDagRunQueue) + .filter(DatasetDagRunQueue.target_dag_id == orphan_dag_id) + .count() + == 1 + ) + + def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(self, session, caplog): + """When multiple dags lack SerializedDagModel, the warning lists dag_ids sorted.""" + session.add_all( + [ + DatasetModel(uri="ds_ghost_z"), + DatasetModel(uri="ds_ghost_a"), + ] + ) + session.flush() + ds_z_id = session.query(DatasetModel.id).filter_by(uri="ds_ghost_z").scalar() + ds_a_id = session.query(DatasetModel.id).filter_by(uri="ds_ghost_a").scalar() + far = timezone.datetime(2038, 1, 1) + far_after = timezone.datetime(2038, 1, 2) + session.add_all( + [ + DagModel( + dag_id="ghost_z", + max_active_tasks=1, + has_task_concurrency_limits=False, + next_dagrun=far, + next_dagrun_create_after=far_after, + is_active=True, + has_import_errors=False, + is_paused=False, + ), + DagModel( + dag_id="ghost_a", + max_active_tasks=1, + has_task_concurrency_limits=False, + next_dagrun=far, + next_dagrun_create_after=far_after, + is_active=True, + has_import_errors=False, + is_paused=False, + ), + ] + ) + session.flush() + + session.add_all( + [ + DatasetDagRunQueue(dataset_id=ds_z_id, target_dag_id="ghost_z"), + DatasetDagRunQueue(dataset_id=ds_a_id, target_dag_id="ghost_a"), + ] + ) + session.flush() + + with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"): + _query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session) + + assert "ghost_a" not in dataset_triggered_dag_info + assert "ghost_z" not in dataset_triggered_dag_info + msg = next( + r.message + for r in caplog.records + if "Dags have queued dataset events (DDRQs), but are not found in the serialized_dag table." + in r.message + ) + assert msg.index("ghost_a") < msg.index("ghost_z") + assert ( + session.query(DatasetDagRunQueue) + .filter(DatasetDagRunQueue.target_dag_id.in_(("ghost_a", "ghost_z"))) + .count() + == 2 + ) + def test_dags_needing_dagruns_dataset_aliases(self, dag_maker, session): # link dataset_alias hello_alias to dataset hello dataset_model = DatasetModel(uri="hello") @@ -3078,6 +3188,7 @@ def test_dags_needing_dagruns_dataset_aliases(self, dag_maker, session): max_active_runs=1, schedule=[DatasetAlias(name="hello_alias")], start_date=pendulum.now().add(days=-2), + serialized=True, ): EmptyOperator(task_id="dummy")