Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1624,13 +1624,23 @@ def _run_scheduler_loop(self) -> None:
self.log.exception("Something went wrong when trying to save task event logs.")

with create_session() as session:
# Only retrieve expired deadlines that haven't been processed yet.
# `missed` is False by default until the handler sets it.
for deadline in session.scalars(
# Lock expired, unhandled deadlines with FOR UPDATE SKIP LOCKED so
# concurrent HA scheduler replicas don't both process the same row
# and create duplicate callbacks.
deadline_query = (
select(Deadline)
.where(Deadline.deadline_time < datetime.now(timezone.utc))
.where(~Deadline.missed)
.options(selectinload(Deadline.callback), selectinload(Deadline.dagrun))
)
for deadline in session.scalars(
with_row_locks(
deadline_query,
of=Deadline,
session=session,
skip_locked=True,
key_share=False,
)
):
deadline.handle_miss(session)

Expand Down
62 changes: 62 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
from airflow.serialization.serialized_objects import LazyDeserializedDAG
from airflow.timetables.base import DagRunInfo, DataInterval
from airflow.utils.session import create_session, provide_session
from airflow.utils.sqlalchemy import with_row_locks
from airflow.utils.state import CallbackState, DagRunState, State, TaskInstanceState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand Down Expand Up @@ -7897,6 +7898,67 @@ def test_process_expired_deadlines_no_deadlines_found(self, mock_handle_miss, se
# The handler should not be called, but no exceptions should be raised either.`
mock_handle_miss.assert_not_called()

@mock.patch("airflow.models.Deadline.handle_miss")
def test_expired_deadline_locked_by_other_scheduler_is_skipped(
self, mock_handle_miss, session, dag_maker
):
"""The scheduler's deadline loop must skip rows another replica already holds."""
if session.get_bind().dialect.name == "sqlite":
pytest.skip("SQLite does not support row-level locking (SKIP LOCKED)")

past_date = timezone.utcnow() - timedelta(minutes=5)
dag_id = "test_deadline_locked_by_other_scheduler"
callback_path = "classpath.notify"

with dag_maker(dag_id=dag_id):
EmptyOperator(task_id="empty")
dagrun_id = dag_maker.create_dagrun().id

serialized_dag = session.scalar(select(SerializedDagModel).where(SerializedDagModel.dag_id == dag_id))
assert serialized_dag is not None

deadline_alert = DeadlineAlert(
serialized_dag_id=serialized_dag.id,
name="Test Skip Locked",
reference={"type": "dag", "dag_id": dag_id},
interval=300.0,
callback_def={"classpath": callback_path, "kwargs": {}},
)
session.add(deadline_alert)
session.flush()

session.add(
Deadline(
deadline_time=past_date,
callback=AsyncCallback(callback_path),
dagrun_id=dagrun_id,
dag_id=dag_id,
deadline_alert_id=deadline_alert.id,
)
)
session.commit()

# scoped=False gives an independent session with its own connection; the
# default scoped_session would reuse this thread's session and locks held
# by "self" do not block "self".
with create_session(scoped=False) as competing_session:
locked_rows = competing_session.scalars(
with_row_locks(
select(Deadline).where(~Deadline.missed),
of=Deadline,
session=competing_session,
skip_locked=True,
key_share=False,
)
).all()
assert len(locked_rows) == 1

scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=1, executors=[MockExecutor()])
self.job_runner._execute()

mock_handle_miss.assert_not_called()

def test_emit_running_dags_metric(self, dag_maker, monkeypatch):
"""Test that the running_dags metric is emitted correctly."""
with dag_maker("metric_dag") as dag:
Expand Down
Loading