@@ -3064,6 +3064,87 @@ def test_dags_needing_dagruns_datasets(self, dag_maker, session):
30643064 dag_models = query .all ()
30653065 assert dag_models == [dag_model ]
30663066
3067+ def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing (self , session , caplog ):
3068+ """DDRQ rows for a dag_id without SerializedDagModel must be skipped (no dataset_triggered info)."""
3069+ orphan_dag_id = "ddr_q_no_serialized_dag"
3070+ session .add (DatasetModel (uri = "dataset_for_orphan_ddrq" ))
3071+ session .flush ()
3072+ dataset_id = session .query (DatasetModel .id ).filter_by (uri = "dataset_for_orphan_ddrq" ).scalar ()
3073+ session .add (
3074+ DagModel (
3075+ dag_id = orphan_dag_id ,
3076+ max_active_tasks = 1 ,
3077+ has_task_concurrency_limits = False ,
3078+ next_dagrun = timezone .datetime (2038 , 1 , 1 ),
3079+ next_dagrun_create_after = timezone .datetime (2038 , 1 , 2 ),
3080+ is_active = True ,
3081+ has_import_errors = False ,
3082+ is_paused = False ,
3083+ )
3084+ )
3085+ session .add (DatasetDagRunQueue (dataset_id = dataset_id , target_dag_id = orphan_dag_id ))
3086+ session .flush ()
3087+
3088+ with caplog .at_level (logging .WARNING , logger = "airflow.models.dag" ):
3089+ _query , dataset_triggered_dag_info = DagModel .dags_needing_dagruns (session )
3090+
3091+ assert orphan_dag_id not in dataset_triggered_dag_info
3092+ assert "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel" in caplog .text
3093+ assert orphan_dag_id in caplog .text
3094+
3095+ def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids (self , session , caplog ):
3096+ """When multiple dags lack SerializedDagModel, the warning lists dag_ids sorted."""
3097+ session .add_all (
3098+ [
3099+ DatasetModel (uri = "ds_ghost_z" ),
3100+ DatasetModel (uri = "ds_ghost_a" ),
3101+ ]
3102+ )
3103+ session .flush ()
3104+ ds_z_id = session .query (DatasetModel .id ).filter_by (uri = "ds_ghost_z" ).scalar ()
3105+ ds_a_id = session .query (DatasetModel .id ).filter_by (uri = "ds_ghost_a" ).scalar ()
3106+ far = timezone .datetime (2038 , 1 , 1 )
3107+ far_after = timezone .datetime (2038 , 1 , 2 )
3108+ session .add_all (
3109+ [
3110+ DagModel (
3111+ dag_id = "ghost_z" ,
3112+ max_active_tasks = 1 ,
3113+ has_task_concurrency_limits = False ,
3114+ next_dagrun = far ,
3115+ next_dagrun_create_after = far_after ,
3116+ is_active = True ,
3117+ has_import_errors = False ,
3118+ is_paused = False ,
3119+ ),
3120+ DagModel (
3121+ dag_id = "ghost_a" ,
3122+ max_active_tasks = 1 ,
3123+ has_task_concurrency_limits = False ,
3124+ next_dagrun = far ,
3125+ next_dagrun_create_after = far_after ,
3126+ is_active = True ,
3127+ has_import_errors = False ,
3128+ is_paused = False ,
3129+ ),
3130+ DatasetDagRunQueue (dataset_id = ds_z_id , target_dag_id = "ghost_z" ),
3131+ DatasetDagRunQueue (dataset_id = ds_a_id , target_dag_id = "ghost_a" ),
3132+ ]
3133+ )
3134+ session .flush ()
3135+
3136+ with caplog .at_level (logging .WARNING , logger = "airflow.models.dag" ):
3137+ _query , dataset_triggered_dag_info = DagModel .dags_needing_dagruns (session )
3138+
3139+ assert "ghost_a" not in dataset_triggered_dag_info
3140+ assert "ghost_z" not in dataset_triggered_dag_info
3141+ msg = next (
3142+ r .message
3143+ for r in caplog .records
3144+ if "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel" in r .message
3145+ )
3146+ assert msg .index ("ghost_a" ) < msg .index ("ghost_z" )
3147+
30673148 def test_dags_needing_dagruns_dataset_aliases (self , dag_maker , session ):
30683149 # link dataset_alias hello_alias to dataset hello
30693150 dataset_model = DatasetModel (uri = "hello" )
0 commit comments