From 75ef8b598212ebc26bb541def58c0ef4c4776cd3 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Fri, 13 Mar 2026 12:04:59 -0300 Subject: [PATCH 01/17] feat(scheduler): add INFO logging for dataset-triggered DagRun creation Log which DAGs are selected as dataset-triggered (with ADRQ timestamp ranges) and log successful DagRun creation with dag_id, exec_date, prev_exec, event count, and event URIs. This provides visibility into the scheduler's dataset trigger decisions for debugging premature trigger incidents. Made-with: Cursor --- airflow/jobs/scheduler_job_runner.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 2725dd71d9f3b..28b467b001b30 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1326,6 +1326,13 @@ def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session: Sessio non_dataset_dags = all_dags_needing_dag_runs.difference(dataset_triggered_dags) self._create_dag_runs(non_dataset_dags, session) if dataset_triggered_dags: + self.log.info( + "Dataset-triggered DAGs ready: %s", + { + dag_id: (str(first), str(last)) + for dag_id, (first, last) in dataset_triggered_dag_info.items() + }, + ) self._create_dag_runs_dataset_triggered( dataset_triggered_dags, dataset_triggered_dag_info, session ) @@ -1502,6 +1509,15 @@ def _create_dag_runs_dataset_triggered( ) Stats.incr("dataset.triggered_dagruns") dag_run.consumed_dataset_events.extend(dataset_events) + self.log.info( + "Dataset-triggered DagRun created: dag_id=%s, exec_date=%s, " + "prev_exec=%s, events_consumed=%d, event_uris=%s", + dag.dag_id, + exec_date, + previous_dag_run.execution_date if previous_dag_run else None, + len(dataset_events), + sorted({e.dataset.uri for e in dataset_events}), + ) session.execute( delete(DatasetDagRunQueue).where(DatasetDagRunQueue.target_dag_id == dag_run.dag_id) ) From 7b7e6a857f102cb42ebdf050ccfa8557e23e59f8 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Mon, 16 Mar 2026 12:16:07 -0300 Subject: [PATCH 02/17] feat(scheduler): add deep diagnostic logging for dataset trigger decisions Log the full context of dataset-triggered scheduling to debug premature trigger incidents: - P0: Log condition, DDRQ URIs, and count when dataset_condition is satisfied (INFO in dags_needing_dagruns) - P1: Warn on DDRQ/event mismatch when queued URIs have no matching DatasetEvent in the timestamp range (WARNING in _create_dag_runs_dataset_triggered) - P2: Include data_interval start/end in the DagRun creation log - P3: Log consumed event timestamps and source DAG/run_id (DEBUG) Made-with: Cursor --- airflow/jobs/scheduler_job_runner.py | 39 +++++++++++++++++++++++++++- airflow/models/dag.py | 12 ++++++++- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 28b467b001b30..72300a0972c6c 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1487,6 +1487,26 @@ def _create_dag_runs_dataset_triggered( .where(*dataset_event_filters) ).all() + ddrq_records = session.scalars( + select(DatasetDagRunQueue).where( + DatasetDagRunQueue.target_dag_id == dag.dag_id + ) + ).all() + ddrq_uris = {r.dataset.uri for r in ddrq_records} + consumed_uris = {e.dataset.uri for e in dataset_events} + missing_uris = ddrq_uris - consumed_uris + if missing_uris: + self.log.warning( + "DDRQ/event mismatch: dag_id=%s has DDRQ URIs %s with no matching " + "DatasetEvent in range (prev_exec=%s, exec_date=%s]. " + "Consumed URIs: %s. Possible stale DDRQ records.", + dag.dag_id, + sorted(missing_uris), + previous_dag_run.execution_date if previous_dag_run else None, + exec_date, + sorted(consumed_uris), + ) + data_interval = dag.timetable.data_interval_for_events(exec_date, dataset_events) run_id = dag.timetable.generate_run_id( run_type=DagRunType.DATASET_TRIGGERED, @@ -1511,13 +1531,30 @@ def _create_dag_runs_dataset_triggered( dag_run.consumed_dataset_events.extend(dataset_events) self.log.info( "Dataset-triggered DagRun created: dag_id=%s, exec_date=%s, " - "prev_exec=%s, events_consumed=%d, event_uris=%s", + "prev_exec=%s, data_interval=(%s, %s), " + "events_consumed=%d, event_uris=%s", dag.dag_id, exec_date, previous_dag_run.execution_date if previous_dag_run else None, + data_interval.start, + data_interval.end, len(dataset_events), sorted({e.dataset.uri for e in dataset_events}), ) + if dataset_events: + event_timestamps = [e.timestamp for e in dataset_events] + self.log.debug( + "Consumed event details: dag_id=%s, " + "event_ts_range=(%s, %s), " + "events=[%s]", + dag.dag_id, + min(event_timestamps), + max(event_timestamps), + ", ".join( + f"{e.dataset.uri}|ts={e.timestamp}|src={e.source_dag_id}/{e.source_run_id}" + for e in dataset_events + ), + ) session.execute( delete(DatasetDagRunQueue).where(DatasetDagRunQueue.target_dag_id == dag_run.dag_id) ) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 6b3744db13894..b2984560bc5d6 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4097,10 +4097,20 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] + dataset_condition = ser_dag.dag.timetable.dataset_condition - if not dag_ready(dag_id, cond=ser_dag.dag.timetable.dataset_condition, statuses=statuses): + if not dag_ready(dag_id, cond=dataset_condition, statuses=statuses): del by_dag[dag_id] del dag_statuses[dag_id] + else: + log.info( + "Dataset condition satisfied: dag_id=%s, condition=%s, " + "ddrq_uris=%s, ddrq_count=%d", + dag_id, + dataset_condition, + sorted(statuses.keys()), + len(statuses), + ) del dag_statuses dataset_triggered_dag_info = {} for dag_id, records in by_dag.items(): From 5fbf7728e14b73cd020700b0fdb9eec86eb5c885 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Mon, 16 Mar 2026 15:28:19 -0300 Subject: [PATCH 03/17] chore(scheduler): Add [DEBUG DATASETS] tag to logs Made-with: Cursor --- airflow/jobs/scheduler_job_runner.py | 8 ++++---- airflow/models/dag.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 72300a0972c6c..1a8aeb3dde72c 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1327,7 +1327,7 @@ def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session: Sessio self._create_dag_runs(non_dataset_dags, session) if dataset_triggered_dags: self.log.info( - "Dataset-triggered DAGs ready: %s", + "[DEBUG DATASETS] Dataset-triggered DAGs ready: %s", { dag_id: (str(first), str(last)) for dag_id, (first, last) in dataset_triggered_dag_info.items() @@ -1497,7 +1497,7 @@ def _create_dag_runs_dataset_triggered( missing_uris = ddrq_uris - consumed_uris if missing_uris: self.log.warning( - "DDRQ/event mismatch: dag_id=%s has DDRQ URIs %s with no matching " + "[DEBUG DATASETS] DDRQ/event mismatch: dag_id=%s has DDRQ URIs %s with no matching " "DatasetEvent in range (prev_exec=%s, exec_date=%s]. " "Consumed URIs: %s. Possible stale DDRQ records.", dag.dag_id, @@ -1530,7 +1530,7 @@ def _create_dag_runs_dataset_triggered( Stats.incr("dataset.triggered_dagruns") dag_run.consumed_dataset_events.extend(dataset_events) self.log.info( - "Dataset-triggered DagRun created: dag_id=%s, exec_date=%s, " + "[DEBUG DATASETS] Dataset-triggered DagRun created: dag_id=%s, exec_date=%s, " "prev_exec=%s, data_interval=(%s, %s), " "events_consumed=%d, event_uris=%s", dag.dag_id, @@ -1544,7 +1544,7 @@ def _create_dag_runs_dataset_triggered( if dataset_events: event_timestamps = [e.timestamp for e in dataset_events] self.log.debug( - "Consumed event details: dag_id=%s, " + "[DEBUG DATASETS] Consumed event details: dag_id=%s, " "event_ts_range=(%s, %s), " "events=[%s]", dag.dag_id, diff --git a/airflow/models/dag.py b/airflow/models/dag.py index b2984560bc5d6..64cdd0497d286 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4104,7 +4104,7 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: del dag_statuses[dag_id] else: log.info( - "Dataset condition satisfied: dag_id=%s, condition=%s, " + "[DEBUG DATASETS] Dataset condition satisfied: dag_id=%s, condition=%s, " "ddrq_uris=%s, ddrq_count=%d", dag_id, dataset_condition, From 735e145d60f8161d7a4d20a0a8a1226c69c9c3df Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Fri, 20 Mar 2026 10:55:49 -0300 Subject: [PATCH 04/17] fix(scheduler): guard against condition bypass when SerializedDagModel is missing DAGs with DDRQ entries but no corresponding SerializedDagModel were bypassing dataset condition evaluation in dags_needing_dagruns() and entering dataset_triggered_dag_info unchecked. This caused premature triggers with partial events when the DAG processor was mid-parse cycle. Now explicitly detects the mismatch and excludes those DAGs from the current scheduler loop. DDRQ entries are preserved so the DAG is re-evaluated on the next heartbeat (~5s) once serialization completes. Made-with: Cursor --- airflow/models/dag.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 64cdd0497d286..f0dd1e906f0c3 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4094,6 +4094,17 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: ser_dags = session.scalars( select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys())) ).all() + ser_dag_ids = {s.dag_id for s in ser_dags} + missing_from_serialized = set(by_dag.keys()) - ser_dag_ids + if missing_from_serialized: + log.warning( + "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel " + "(skipping — condition cannot be evaluated): %s", + sorted(missing_from_serialized), + ) + for dag_id in missing_from_serialized: + del by_dag[dag_id] + del dag_statuses[dag_id] for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] From bed99386a2a3625f979e61ed47116a6640df0dc3 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Fri, 20 Mar 2026 15:02:24 -0300 Subject: [PATCH 05/17] fix(dag): remove missing_from_serialized after processing serialized DAGs This change ensures that the `missing_from_serialized` variable is deleted after its entries have been processed, preventing potential memory leaks and maintaining cleaner state management within the DAG model. --- airflow/models/dag.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index f0dd1e906f0c3..80b48cb5cf25e 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4105,6 +4105,7 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: for dag_id in missing_from_serialized: del by_dag[dag_id] del dag_statuses[dag_id] + del missing_from_serialized for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] From e427302328365b3e4534fca26716bc7222272744 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Fri, 20 Mar 2026 17:25:51 -0300 Subject: [PATCH 06/17] test(models): add tests for DDRQ entries without SerializedDagModel --- tests/models/test_dag.py | 81 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 5f721b61d2691..9b6d623cc878b 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3064,6 +3064,87 @@ def test_dags_needing_dagruns_datasets(self, dag_maker, session): dag_models = query.all() assert dag_models == [dag_model] + def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, session, caplog): + """DDRQ rows for a dag_id without SerializedDagModel must be skipped (no dataset_triggered info).""" + orphan_dag_id = "ddr_q_no_serialized_dag" + session.add(DatasetModel(uri="dataset_for_orphan_ddrq")) + session.flush() + dataset_id = session.query(DatasetModel.id).filter_by(uri="dataset_for_orphan_ddrq").scalar() + session.add( + DagModel( + dag_id=orphan_dag_id, + max_active_tasks=1, + has_task_concurrency_limits=False, + next_dagrun=timezone.datetime(2038, 1, 1), + next_dagrun_create_after=timezone.datetime(2038, 1, 2), + is_active=True, + has_import_errors=False, + is_paused=False, + ) + ) + session.add(DatasetDagRunQueue(dataset_id=dataset_id, target_dag_id=orphan_dag_id)) + session.flush() + + with caplog.at_level(logging.WARNING, logger="airflow.models.dag"): + _query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session) + + assert orphan_dag_id not in dataset_triggered_dag_info + assert "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel" in caplog.text + assert orphan_dag_id in caplog.text + + def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(self, session, caplog): + """When multiple dags lack SerializedDagModel, the warning lists dag_ids sorted.""" + session.add_all( + [ + DatasetModel(uri="ds_ghost_z"), + DatasetModel(uri="ds_ghost_a"), + ] + ) + session.flush() + ds_z_id = session.query(DatasetModel.id).filter_by(uri="ds_ghost_z").scalar() + ds_a_id = session.query(DatasetModel.id).filter_by(uri="ds_ghost_a").scalar() + far = timezone.datetime(2038, 1, 1) + far_after = timezone.datetime(2038, 1, 2) + session.add_all( + [ + DagModel( + dag_id="ghost_z", + max_active_tasks=1, + has_task_concurrency_limits=False, + next_dagrun=far, + next_dagrun_create_after=far_after, + is_active=True, + has_import_errors=False, + is_paused=False, + ), + DagModel( + dag_id="ghost_a", + max_active_tasks=1, + has_task_concurrency_limits=False, + next_dagrun=far, + next_dagrun_create_after=far_after, + is_active=True, + has_import_errors=False, + is_paused=False, + ), + DatasetDagRunQueue(dataset_id=ds_z_id, target_dag_id="ghost_z"), + DatasetDagRunQueue(dataset_id=ds_a_id, target_dag_id="ghost_a"), + ] + ) + session.flush() + + with caplog.at_level(logging.WARNING, logger="airflow.models.dag"): + _query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session) + + assert "ghost_a" not in dataset_triggered_dag_info + assert "ghost_z" not in dataset_triggered_dag_info + msg = next( + r.message + for r in caplog.records + if "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel" in r.message + ) + assert msg.index("ghost_a") < msg.index("ghost_z") + def test_dags_needing_dagruns_dataset_aliases(self, dag_maker, session): # link dataset_alias hello_alias to dataset hello dataset_model = DatasetModel(uri="hello") From f59cec3f04f2aa8affd52cc6f2404bcece6fdb9c Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Thu, 26 Mar 2026 16:42:22 -0300 Subject: [PATCH 07/17] chore(scheduler): remove dataset debug logging Drop [DEBUG DATASETS] instrumentation from SchedulerJobRunner and DagModel dataset-readiness loop; inline timetable dataset_condition where it is only used once. --- airflow/jobs/scheduler_job_runner.py | 53 ---------------------------- airflow/models/dag.py | 12 +------ 2 files changed, 1 insertion(+), 64 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 1a8aeb3dde72c..2725dd71d9f3b 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1326,13 +1326,6 @@ def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session: Sessio non_dataset_dags = all_dags_needing_dag_runs.difference(dataset_triggered_dags) self._create_dag_runs(non_dataset_dags, session) if dataset_triggered_dags: - self.log.info( - "[DEBUG DATASETS] Dataset-triggered DAGs ready: %s", - { - dag_id: (str(first), str(last)) - for dag_id, (first, last) in dataset_triggered_dag_info.items() - }, - ) self._create_dag_runs_dataset_triggered( dataset_triggered_dags, dataset_triggered_dag_info, session ) @@ -1487,26 +1480,6 @@ def _create_dag_runs_dataset_triggered( .where(*dataset_event_filters) ).all() - ddrq_records = session.scalars( - select(DatasetDagRunQueue).where( - DatasetDagRunQueue.target_dag_id == dag.dag_id - ) - ).all() - ddrq_uris = {r.dataset.uri for r in ddrq_records} - consumed_uris = {e.dataset.uri for e in dataset_events} - missing_uris = ddrq_uris - consumed_uris - if missing_uris: - self.log.warning( - "[DEBUG DATASETS] DDRQ/event mismatch: dag_id=%s has DDRQ URIs %s with no matching " - "DatasetEvent in range (prev_exec=%s, exec_date=%s]. " - "Consumed URIs: %s. Possible stale DDRQ records.", - dag.dag_id, - sorted(missing_uris), - previous_dag_run.execution_date if previous_dag_run else None, - exec_date, - sorted(consumed_uris), - ) - data_interval = dag.timetable.data_interval_for_events(exec_date, dataset_events) run_id = dag.timetable.generate_run_id( run_type=DagRunType.DATASET_TRIGGERED, @@ -1529,32 +1502,6 @@ def _create_dag_runs_dataset_triggered( ) Stats.incr("dataset.triggered_dagruns") dag_run.consumed_dataset_events.extend(dataset_events) - self.log.info( - "[DEBUG DATASETS] Dataset-triggered DagRun created: dag_id=%s, exec_date=%s, " - "prev_exec=%s, data_interval=(%s, %s), " - "events_consumed=%d, event_uris=%s", - dag.dag_id, - exec_date, - previous_dag_run.execution_date if previous_dag_run else None, - data_interval.start, - data_interval.end, - len(dataset_events), - sorted({e.dataset.uri for e in dataset_events}), - ) - if dataset_events: - event_timestamps = [e.timestamp for e in dataset_events] - self.log.debug( - "[DEBUG DATASETS] Consumed event details: dag_id=%s, " - "event_ts_range=(%s, %s), " - "events=[%s]", - dag.dag_id, - min(event_timestamps), - max(event_timestamps), - ", ".join( - f"{e.dataset.uri}|ts={e.timestamp}|src={e.source_dag_id}/{e.source_run_id}" - for e in dataset_events - ), - ) session.execute( delete(DatasetDagRunQueue).where(DatasetDagRunQueue.target_dag_id == dag_run.dag_id) ) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 80b48cb5cf25e..fdcab7bdaf64d 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4109,20 +4109,10 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] - dataset_condition = ser_dag.dag.timetable.dataset_condition - if not dag_ready(dag_id, cond=dataset_condition, statuses=statuses): + if not dag_ready(dag_id, cond=ser_dag.dag.timetable.dataset_condition, statuses=statuses): del by_dag[dag_id] del dag_statuses[dag_id] - else: - log.info( - "[DEBUG DATASETS] Dataset condition satisfied: dag_id=%s, condition=%s, " - "ddrq_uris=%s, ddrq_count=%d", - dag_id, - dataset_condition, - sorted(statuses.keys()), - len(statuses), - ) del dag_statuses dataset_triggered_dag_info = {} for dag_id, records in by_dag.items(): From c626f650266680fe1ec6dfc41d572dafdfd882f9 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Thu, 26 Mar 2026 17:30:02 -0300 Subject: [PATCH 08/17] fix(datasets): demote missing SerializedDagModel DDRQ log to debug Log the DDRQ-without-serialization case at debug and remove the [DEBUG DATASETS] prefix; drop redundant del of missing_from_serialized. Tests capture DEBUG, match the new message, and assert dataset_dag_run_queue rows remain after dags_needing_dagruns. --- airflow/models/dag.py | 5 ++--- tests/models/test_dag.py | 29 ++++++++++++++++++++++------- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index fdcab7bdaf64d..92d729a34e5dd 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4097,15 +4097,14 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: ser_dag_ids = {s.dag_id for s in ser_dags} missing_from_serialized = set(by_dag.keys()) - ser_dag_ids if missing_from_serialized: - log.warning( - "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel " + log.debug( + "DAGs in DDRQ but missing SerializedDagModel " "(skipping — condition cannot be evaluated): %s", sorted(missing_from_serialized), ) for dag_id in missing_from_serialized: del by_dag[dag_id] del dag_statuses[dag_id] - del missing_from_serialized for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 9b6d623cc878b..acb18cd08a577 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3065,7 +3065,12 @@ def test_dags_needing_dagruns_datasets(self, dag_maker, session): assert dag_models == [dag_model] def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, session, caplog): - """DDRQ rows for a dag_id without SerializedDagModel must be skipped (no dataset_triggered info).""" + """DDRQ rows for a dag_id without SerializedDagModel must be skipped (no dataset_triggered info). + + Rows must remain in ``dataset_dag_run_queue`` so the scheduler can re-evaluate on a later + heartbeat once ``SerializedDagModel`` exists (``dags_needing_dagruns`` only drops them from + the in-memory candidate set, it does not delete ORM rows). + """ orphan_dag_id = "ddr_q_no_serialized_dag" session.add(DatasetModel(uri="dataset_for_orphan_ddrq")) session.flush() @@ -3085,12 +3090,18 @@ def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, sessi session.add(DatasetDagRunQueue(dataset_id=dataset_id, target_dag_id=orphan_dag_id)) session.flush() - with caplog.at_level(logging.WARNING, logger="airflow.models.dag"): + with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"): _query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session) assert orphan_dag_id not in dataset_triggered_dag_info - assert "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel" in caplog.text + assert "DAGs in DDRQ but missing SerializedDagModel" in caplog.text assert orphan_dag_id in caplog.text + assert ( + session.query(DatasetDagRunQueue) + .filter(DatasetDagRunQueue.target_dag_id == orphan_dag_id) + .count() + == 1 + ) def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(self, session, caplog): """When multiple dags lack SerializedDagModel, the warning lists dag_ids sorted.""" @@ -3133,17 +3144,21 @@ def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(se ) session.flush() - with caplog.at_level(logging.WARNING, logger="airflow.models.dag"): + with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"): _query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session) assert "ghost_a" not in dataset_triggered_dag_info assert "ghost_z" not in dataset_triggered_dag_info msg = next( - r.message - for r in caplog.records - if "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel" in r.message + r.message for r in caplog.records if "DAGs in DDRQ but missing SerializedDagModel" in r.message ) assert msg.index("ghost_a") < msg.index("ghost_z") + assert ( + session.query(DatasetDagRunQueue) + .filter(DatasetDagRunQueue.target_dag_id.in_(("ghost_a", "ghost_z"))) + .count() + == 2 + ) def test_dags_needing_dagruns_dataset_aliases(self, dag_maker, session): # link dataset_alias hello_alias to dataset hello From d089870d72a4089262d1c0c5828968df0dbb039a Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Thu, 26 Mar 2026 17:40:13 -0300 Subject: [PATCH 09/17] chore(datasets): debug-log when dataset condition passes in dags_needing_dagruns --- airflow/models/dag.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 92d729a34e5dd..310c791a45c7a 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4108,10 +4108,19 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] + dataset_condition = ser_dag.dag.timetable.dataset_condition - if not dag_ready(dag_id, cond=ser_dag.dag.timetable.dataset_condition, statuses=statuses): + if not dag_ready(dag_id, cond=dataset_condition, statuses=statuses): del by_dag[dag_id] del dag_statuses[dag_id] + else: + log.debug( + "Dataset condition satisfied: dag_id=%s, condition=%s, ddrq_uris=%s, ddrq_count=%d", + dag_id, + dataset_condition, + sorted(statuses.keys()), + len(statuses), + ) del dag_statuses dataset_triggered_dag_info = {} for dag_id, records in by_dag.items(): From 36105b180054045d7ac5ec7586c29533534ffbdc Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Thu, 26 Mar 2026 18:12:14 -0300 Subject: [PATCH 10/17] docs: add bugfix newsfragment and dags_needing_dagruns dataset docstring --- airflow/models/dag.py | 5 +++++ newsfragments/63546.bugfix.rst | 1 + 2 files changed, 6 insertions(+) create mode 100644 newsfragments/63546.bugfix.rst diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 310c791a45c7a..35ea26cb54be8 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4069,6 +4069,11 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str, tuple[ This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query, you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked. + + For dataset-triggered scheduling, DAGs that have ``DatasetDagRunQueue`` rows but no matching + ``SerializedDagModel`` row are omitted from the returned ``dataset_triggered_dag_info`` until + serialization exists; queue rows are **not** deleted here so the scheduler can re-evaluate on a + later run. """ from airflow.models.serialized_dag import SerializedDagModel diff --git a/newsfragments/63546.bugfix.rst b/newsfragments/63546.bugfix.rst new file mode 100644 index 0000000000000..666d5db207a58 --- /dev/null +++ b/newsfragments/63546.bugfix.rst @@ -0,0 +1 @@ +Fix premature dataset-triggered DagRuns when ``SerializedDagModel`` was missing while ``DatasetDagRunQueue`` still had rows for that DAG; queue entries are kept for the next evaluation. From 684fd2c7061983193686b3bcb68cbebc458c4ffa Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Fri, 27 Mar 2026 19:16:13 -0300 Subject: [PATCH 11/17] test(dag): persist DagModel before DatasetDagRunQueue in unit tests Split DagModel and DatasetDagRunQueue inserts and flush after DagModel so foreign-key order matches production DB constraints in TestDagModel. --- tests/models/test_dag.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index acb18cd08a577..87212b13943d2 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3087,6 +3087,7 @@ def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, sessi is_paused=False, ) ) + session.flush() session.add(DatasetDagRunQueue(dataset_id=dataset_id, target_dag_id=orphan_dag_id)) session.flush() @@ -3138,6 +3139,12 @@ def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(se has_import_errors=False, is_paused=False, ), + ] + ) + session.flush() + + session.add_all( + [ DatasetDagRunQueue(dataset_id=ds_z_id, target_dag_id="ghost_z"), DatasetDagRunQueue(dataset_id=ds_a_id, target_dag_id="ghost_a"), ] From 2c70fa3dbccee0a8e692361880de021111696188 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Mon, 30 Mar 2026 15:57:27 -0300 Subject: [PATCH 12/17] test(dag): pass serialized=True in dataset dags_needing_dagruns tests Ensure dag_maker writes SerializedDagModel so dags_needing_dagruns queries match scheduler expectations for dataset-triggered DAGs. --- tests/models/test_dag.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 87212b13943d2..0c468a7222a17 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3030,6 +3030,7 @@ def test_dags_needing_dagruns_datasets(self, dag_maker, session): max_active_runs=1, schedule=[dataset], start_date=pendulum.now().add(days=-2), + serialized=True, ) as dag: EmptyOperator(task_id="dummy") @@ -3181,6 +3182,7 @@ def test_dags_needing_dagruns_dataset_aliases(self, dag_maker, session): max_active_runs=1, schedule=[DatasetAlias(name="hello_alias")], start_date=pendulum.now().add(days=-2), + serialized=True, ): EmptyOperator(task_id="dummy") From 7f25115822982a40d834e2f55967f708d0d70d97 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Tue, 31 Mar 2026 09:25:27 -0300 Subject: [PATCH 13/17] Apply suggestions from code review Co-authored-by: Wei Lee --- airflow/models/dag.py | 24 ++++++++++++------------ newsfragments/63546.bugfix.rst | 2 +- tests/models/test_dag.py | 2 +- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 35ea26cb54be8..7e839ba679a45 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4070,9 +4070,9 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str, tuple[ you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked. - For dataset-triggered scheduling, DAGs that have ``DatasetDagRunQueue`` rows but no matching + For dataset-triggered scheduling, Dags that have ``DatasetDagRunQueue`` rows but no matching ``SerializedDagModel`` row are omitted from the returned ``dataset_triggered_dag_info`` until - serialization exists; queue rows are **not** deleted here so the scheduler can re-evaluate on a + serialization exists; DDRQs are **not** deleted here so the scheduler can re-evaluate on a later run. """ from airflow.models.serialized_dag import SerializedDagModel @@ -4100,16 +4100,16 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys())) ).all() ser_dag_ids = {s.dag_id for s in ser_dags} - missing_from_serialized = set(by_dag.keys()) - ser_dag_ids - if missing_from_serialized: - log.debug( - "DAGs in DDRQ but missing SerializedDagModel " - "(skipping — condition cannot be evaluated): %s", - sorted(missing_from_serialized), - ) - for dag_id in missing_from_serialized: - del by_dag[dag_id] - del dag_statuses[dag_id] + missing_from_serialized = set(by_dag.keys()) - ser_dag_ids + if missing_from_serialized: + log.debug( + "Dags have queued dataset events (DDRQs), but are not found in the serialized_dag table." + " — skipping Dag run creation: %s", + sorted(missing_from_serialized), + ) + for dag_id in missing_from_serialized: + del by_dag[dag_id] + del dag_statuses[dag_id] for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] diff --git a/newsfragments/63546.bugfix.rst b/newsfragments/63546.bugfix.rst index 666d5db207a58..66a1b385d5d4b 100644 --- a/newsfragments/63546.bugfix.rst +++ b/newsfragments/63546.bugfix.rst @@ -1 +1 @@ -Fix premature dataset-triggered DagRuns when ``SerializedDagModel`` was missing while ``DatasetDagRunQueue`` still had rows for that DAG; queue entries are kept for the next evaluation. +Fix premature dataset-triggered DagRuns when ``SerializedDagModel`` was missing while ``DatasetDagRunQueue`` still had rows for that Dag; queue entries are kept for the next evaluation. diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 0c468a7222a17..58bbf2c97562d 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3066,7 +3066,7 @@ def test_dags_needing_dagruns_datasets(self, dag_maker, session): assert dag_models == [dag_model] def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, session, caplog): - """DDRQ rows for a dag_id without SerializedDagModel must be skipped (no dataset_triggered info). + """DDRQ rows for a Dag without SerializedDagModel must be skipped (no dataset_triggered info). Rows must remain in ``dataset_dag_run_queue`` so the scheduler can re-evaluate on a later heartbeat once ``SerializedDagModel`` exists (``dags_needing_dagruns`` only drops them from From 3c67b9f59d90014c098315dc7cac7e8e606ddbd6 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Tue, 31 Mar 2026 10:27:13 -0300 Subject: [PATCH 14/17] style(dag): normalize line endings in DDRQ serialized-DAG guard block --- airflow/models/dag.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 7e839ba679a45..a5e04972c26a1 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4100,16 +4100,16 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys())) ).all() ser_dag_ids = {s.dag_id for s in ser_dags} - missing_from_serialized = set(by_dag.keys()) - ser_dag_ids - if missing_from_serialized: - log.debug( - "Dags have queued dataset events (DDRQs), but are not found in the serialized_dag table." - " — skipping Dag run creation: %s", - sorted(missing_from_serialized), - ) - for dag_id in missing_from_serialized: - del by_dag[dag_id] - del dag_statuses[dag_id] + missing_from_serialized = set(by_dag.keys()) - ser_dag_ids + if missing_from_serialized: + log.debug( + "Dags have queued dataset events (DDRQs), but are not found in the serialized_dag table." + " — skipping Dag run creation: %s", + sorted(missing_from_serialized), + ) + for dag_id in missing_from_serialized: + del by_dag[dag_id] + del dag_statuses[dag_id] for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] From 8561190d8ff5ad292d5625442b838f66376a9b76 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Wed, 1 Apr 2026 16:35:52 -0300 Subject: [PATCH 15/17] test(dag): align DDRQ missing-serialized log assertions with dag.py dags_needing_dagruns now logs that dags are not found in serialized_dag instead of the older DDRQ/SerializedDagModel wording. --- tests/models/test_dag.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 58bbf2c97562d..c155b7497e074 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3096,7 +3096,7 @@ def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, sessi _query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session) assert orphan_dag_id not in dataset_triggered_dag_info - assert "DAGs in DDRQ but missing SerializedDagModel" in caplog.text + assert "not found in the serialized_dag table" in caplog.text assert orphan_dag_id in caplog.text assert ( session.query(DatasetDagRunQueue) @@ -3157,9 +3157,7 @@ def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(se assert "ghost_a" not in dataset_triggered_dag_info assert "ghost_z" not in dataset_triggered_dag_info - msg = next( - r.message for r in caplog.records if "DAGs in DDRQ but missing SerializedDagModel" in r.message - ) + msg = next(r.message for r in caplog.records if "not found in the serialized_dag table" in r.message) assert msg.index("ghost_a") < msg.index("ghost_z") assert ( session.query(DatasetDagRunQueue) From e3198c9cff86ed70a6ecdb8e23701dfd1f66af06 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Wed, 1 Apr 2026 17:42:55 -0300 Subject: [PATCH 16/17] test(dag): assert full DDRQ missing-serialized log line in caplog --- tests/models/test_dag.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index c155b7497e074..e13da06ab1ab1 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3096,7 +3096,10 @@ def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, sessi _query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session) assert orphan_dag_id not in dataset_triggered_dag_info - assert "not found in the serialized_dag table" in caplog.text + assert ( + "Dags have queued dataset events (DDRQs), but are not found in the serialized_dag table." + in caplog.text + ) assert orphan_dag_id in caplog.text assert ( session.query(DatasetDagRunQueue) @@ -3157,7 +3160,12 @@ def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(se assert "ghost_a" not in dataset_triggered_dag_info assert "ghost_z" not in dataset_triggered_dag_info - msg = next(r.message for r in caplog.records if "not found in the serialized_dag table" in r.message) + msg = next( + r.message + for r in caplog.records + if "Dags have queued dataset events (DDRQs), but are not found in the serialized_dag table." + in r.message + ) assert msg.index("ghost_a") < msg.index("ghost_z") assert ( session.query(DatasetDagRunQueue) From e0425b526a747288b3235f73555f7b6bfb627078 Mon Sep 17 00:00:00 2001 From: Leonardo Santos Date: Sat, 4 Apr 2026 17:11:14 -0300 Subject: [PATCH 17/17] chore: change missing serialized DAG log level from debug to info and remove associated newsfragment --- airflow/models/dag.py | 2 +- newsfragments/63546.bugfix.rst | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) delete mode 100644 newsfragments/63546.bugfix.rst diff --git a/airflow/models/dag.py b/airflow/models/dag.py index a5e04972c26a1..741b28120df4b 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4102,7 +4102,7 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: ser_dag_ids = {s.dag_id for s in ser_dags} missing_from_serialized = set(by_dag.keys()) - ser_dag_ids if missing_from_serialized: - log.debug( + log.info( "Dags have queued dataset events (DDRQs), but are not found in the serialized_dag table." " — skipping Dag run creation: %s", sorted(missing_from_serialized), diff --git a/newsfragments/63546.bugfix.rst b/newsfragments/63546.bugfix.rst deleted file mode 100644 index 66a1b385d5d4b..0000000000000 --- a/newsfragments/63546.bugfix.rst +++ /dev/null @@ -1 +0,0 @@ -Fix premature dataset-triggered DagRuns when ``SerializedDagModel`` was missing while ``DatasetDagRunQueue`` still had rows for that Dag; queue entries are kept for the next evaluation.