Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 50 additions & 4 deletions openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import os
import re
import sys
import threading
import traceback
from multiprocessing import Process
from typing import Optional
Expand Down Expand Up @@ -208,10 +209,55 @@ def _run_old_scheduler_job() -> "SchedulerJob":
return scheduler_job


def scan_dags_job_background():
"""
Runs the scheduler scan in another thread
to not block the API call
_scan_lock = threading.Lock()
_current_scan: Optional[ScanDagsTask] = None


def _start_scan():
"""Start a new ScanDagsTask and spawn a reaper thread to join it.

Must be called while holding _scan_lock.
"""
global _current_scan # pylint: disable=global-statement
process = ScanDagsTask()
process.start()
_current_scan = process
Comment on lines +216 to +224
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_start_scan() mutates _current_scan/_rescan_requested but relies on callers already holding _scan_lock. To avoid accidental future calls without the lock (which would introduce races), consider either acquiring _scan_lock inside _start_scan() or adding an explicit comment/assertion that it must only be called under the lock.

Copilot uses AI. Check for mistakes.
reaper = threading.Thread(target=_reap_scan, args=(process,), daemon=True)
reaper.start()


def _reap_scan(process: ScanDagsTask):
"""Wait for the scan process to finish and release resources.

Runs in a daemon thread. Only joins the process and clears module
state — never forks a new process, because forking from a non-main
thread with the default ``fork`` start-method can deadlock.
"""
Comment on lines +232 to +235
Copy link

Copilot AI Apr 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description states the reaper can start a follow-up scan when a deploy arrives mid-scan, but _reap_scan explicitly documents and enforces the opposite (“never forks a new process”). Please align the intended behavior across PR description, code comments, and tests (either remove the deferred-rescan claim from the description or adjust the implementation/tests accordingly).

Copilot uses AI. Check for mistakes.
process.join()
with _scan_lock:
global _current_scan # pylint: disable=global-statement
if _current_scan is process:
_current_scan = None


def scan_dags_job_background():
"""
Runs the scheduler scan in a separate process
to not block the API call.

Uses a per-worker guard to prevent spawning multiple concurrent
ScanDagsTask processes from the same Python worker. Each process
imports the full Airflow scheduler stack, so spawning duplicates
increases memory usage and can create orphaned SchedulerJob entries
in the Airflow DB. This guard does not coordinate across multiple
Gunicorn workers or other processes.

If a scan is already running when a new deploy arrives, the call
is skipped. Newly deployed DAGs will be discovered by the next
deploy-triggered scan or by Airflow's periodic scheduler.
"""
with _scan_lock:
if _current_scan is not None and _current_scan.is_alive():
logger.info("DAG scan already in progress, skipping")
return
_start_scan()
126 changes: 126 additions & 0 deletions openmetadata-airflow-apis/tests/unit/test_scan_dags_singleton.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test singleton guard for scan_dags_job_background.
Verifies fix for https://github.com/open-metadata/OpenMetadata/issues/23646
"""

from unittest.mock import MagicMock, patch

import openmetadata_managed_apis.api.utils as utils_module


def _reset_module_state():
"""Reset the module-level singleton state between tests."""
utils_module._current_scan = None


def test_first_call_starts_process():
"""First call should spawn a new ScanDagsTask process."""
_reset_module_state()

mock_process = MagicMock()
with patch.object(utils_module, "ScanDagsTask", return_value=mock_process):
with patch.object(utils_module.threading, "Thread"):
utils_module.scan_dags_job_background()

mock_process.start.assert_called_once()
assert utils_module._current_scan is mock_process


def test_skips_when_scan_alive():
"""While a scan is alive, new calls should skip (not spawn another)."""
_reset_module_state()

alive_process = MagicMock()
alive_process.is_alive.return_value = True
utils_module._current_scan = alive_process

with patch.object(utils_module, "ScanDagsTask") as mock_cls:
utils_module.scan_dags_job_background()

mock_cls.assert_not_called()
assert utils_module._current_scan is alive_process


def test_replaces_finished_scan_with_new_one():
"""When previous scan finished, a new call should start a fresh scan."""
_reset_module_state()

finished_process = MagicMock()
finished_process.is_alive.return_value = False
utils_module._current_scan = finished_process

new_process = MagicMock()
with patch.object(utils_module, "ScanDagsTask", return_value=new_process):
with patch.object(utils_module.threading, "Thread"):
utils_module.scan_dags_job_background()

new_process.start.assert_called_once()
assert utils_module._current_scan is new_process


def test_reaper_clears_current_scan():
"""Reaper thread should join process and clear _current_scan."""
_reset_module_state()

finished_process = MagicMock()
utils_module._current_scan = finished_process

with patch.object(utils_module, "ScanDagsTask") as mock_cls:
utils_module._reap_scan(finished_process)

finished_process.join.assert_called_once()
mock_cls.assert_not_called()
assert utils_module._current_scan is None


def test_reaper_never_forks():
"""Reaper thread must never start a new process (fork from non-main thread)."""
_reset_module_state()

finished_process = MagicMock()
utils_module._current_scan = finished_process

with patch.object(utils_module, "ScanDagsTask") as mock_cls:
utils_module._reap_scan(finished_process)

mock_cls.assert_not_called()


Comment on lines +86 to +98
Copy link

Copilot AI Apr 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests currently assert that _reap_scan must never start a new ScanDagsTask (see docstring in this test), which conflicts with the PR description’s “deferred rescan” behavior. If the intended fix includes a queued follow-up scan, adjust the tests to cover that behavior; otherwise, update the PR description to match the implemented skip semantics.

Copilot uses AI. Check for mistakes.
def test_no_daemon_flag_on_process():
"""ScanDagsTask must NOT be created with daemon=True (spawns children)."""
_reset_module_state()

mock_process = MagicMock()
mock_process.daemon = False
with patch.object(
utils_module, "ScanDagsTask", return_value=mock_process
) as mock_cls:
with patch.object(utils_module.threading, "Thread"):
utils_module.scan_dags_job_background()

mock_cls.assert_called_once_with()
Comment thread
RajdeepKushwaha5 marked this conversation as resolved.
assert mock_process.daemon is False


def test_stale_reaper_does_not_clear_replaced_scan():
"""Stale reaper must not clear _current_scan if another scan replaced it."""
_reset_module_state()

old_process = MagicMock()
new_process = MagicMock()
utils_module._current_scan = new_process

utils_module._reap_scan(old_process)

old_process.join.assert_called_once()
assert utils_module._current_scan is new_process
Loading