diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 67006a7d8ee27..d949012d4f5a4 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1624,13 +1624,23 @@ def _run_scheduler_loop(self) -> None: self.log.exception("Something went wrong when trying to save task event logs.") with create_session() as session: - # Only retrieve expired deadlines that haven't been processed yet. - # `missed` is False by default until the handler sets it. - for deadline in session.scalars( + # Lock expired, unhandled deadlines with FOR UPDATE SKIP LOCKED so + # concurrent HA scheduler replicas don't both process the same row + # and create duplicate callbacks. + deadline_query = ( select(Deadline) .where(Deadline.deadline_time < datetime.now(timezone.utc)) .where(~Deadline.missed) .options(selectinload(Deadline.callback), selectinload(Deadline.dagrun)) + ) + for deadline in session.scalars( + with_row_locks( + deadline_query, + of=Deadline, + session=session, + skip_locked=True, + key_share=False, + ) ): deadline.handle_miss(session) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 5ccbb7e40568d..b0f72b91db3eb 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -101,6 +101,7 @@ from airflow.serialization.serialized_objects import LazyDeserializedDAG from airflow.timetables.base import DagRunInfo, DataInterval from airflow.utils.session import create_session, provide_session +from airflow.utils.sqlalchemy import with_row_locks from airflow.utils.state import CallbackState, DagRunState, State, TaskInstanceState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -7897,6 +7898,67 @@ def test_process_expired_deadlines_no_deadlines_found(self, mock_handle_miss, se # The handler should not be called, but no exceptions should be raised either.` mock_handle_miss.assert_not_called() + @mock.patch("airflow.models.Deadline.handle_miss") + def test_expired_deadline_locked_by_other_scheduler_is_skipped( + self, mock_handle_miss, session, dag_maker + ): + """The scheduler's deadline loop must skip rows another replica already holds.""" + if session.get_bind().dialect.name == "sqlite": + pytest.skip("SQLite does not support row-level locking (SKIP LOCKED)") + + past_date = timezone.utcnow() - timedelta(minutes=5) + dag_id = "test_deadline_locked_by_other_scheduler" + callback_path = "classpath.notify" + + with dag_maker(dag_id=dag_id): + EmptyOperator(task_id="empty") + dagrun_id = dag_maker.create_dagrun().id + + serialized_dag = session.scalar(select(SerializedDagModel).where(SerializedDagModel.dag_id == dag_id)) + assert serialized_dag is not None + + deadline_alert = DeadlineAlert( + serialized_dag_id=serialized_dag.id, + name="Test Skip Locked", + reference={"type": "dag", "dag_id": dag_id}, + interval=300.0, + callback_def={"classpath": callback_path, "kwargs": {}}, + ) + session.add(deadline_alert) + session.flush() + + session.add( + Deadline( + deadline_time=past_date, + callback=AsyncCallback(callback_path), + dagrun_id=dagrun_id, + dag_id=dag_id, + deadline_alert_id=deadline_alert.id, + ) + ) + session.commit() + + # scoped=False gives an independent session with its own connection; the + # default scoped_session would reuse this thread's session and locks held + # by "self" do not block "self". + with create_session(scoped=False) as competing_session: + locked_rows = competing_session.scalars( + with_row_locks( + select(Deadline).where(~Deadline.missed), + of=Deadline, + session=competing_session, + skip_locked=True, + key_share=False, + ) + ).all() + assert len(locked_rows) == 1 + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=1, executors=[MockExecutor()]) + self.job_runner._execute() + + mock_handle_miss.assert_not_called() + def test_emit_running_dags_metric(self, dag_maker, monkeypatch): """Test that the running_dags metric is emitted correctly.""" with dag_maker("metric_dag") as dag: