-
Notifications
You must be signed in to change notification settings - Fork 2k
Fixes #23646: Fix memory leak in scan_dags_job_background by adding singleton guard #27057
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ffa5e4a
cc92157
c78b0a8
9d7c859
8b50f14
ed4ea0e
26e972f
f33fa44
5206e77
d5d7770
df061c6
bcba276
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,7 @@ | |
| import os | ||
| import re | ||
| import sys | ||
| import threading | ||
| import traceback | ||
| from multiprocessing import Process | ||
| from typing import Optional | ||
|
|
@@ -208,10 +209,55 @@ def _run_old_scheduler_job() -> "SchedulerJob": | |
| return scheduler_job | ||
|
|
||
|
|
||
| def scan_dags_job_background(): | ||
| """ | ||
| Runs the scheduler scan in another thread | ||
| to not block the API call | ||
| _scan_lock = threading.Lock() | ||
| _current_scan: Optional[ScanDagsTask] = None | ||
|
|
||
|
|
||
| def _start_scan(): | ||
| """Start a new ScanDagsTask and spawn a reaper thread to join it. | ||
|
|
||
| Must be called while holding _scan_lock. | ||
| """ | ||
| global _current_scan # pylint: disable=global-statement | ||
| process = ScanDagsTask() | ||
| process.start() | ||
| _current_scan = process | ||
| reaper = threading.Thread(target=_reap_scan, args=(process,), daemon=True) | ||
| reaper.start() | ||
|
|
||
|
|
||
| def _reap_scan(process: ScanDagsTask): | ||
| """Wait for the scan process to finish and release resources. | ||
|
|
||
| Runs in a daemon thread. Only joins the process and clears module | ||
| state — never forks a new process, because forking from a non-main | ||
| thread with the default ``fork`` start-method can deadlock. | ||
| """ | ||
|
Comment on lines
+232
to
+235
|
||
| process.join() | ||
| with _scan_lock: | ||
| global _current_scan # pylint: disable=global-statement | ||
| if _current_scan is process: | ||
| _current_scan = None | ||
|
|
||
|
|
||
| def scan_dags_job_background(): | ||
| """ | ||
| Runs the scheduler scan in a separate process | ||
| to not block the API call. | ||
|
|
||
| Uses a per-worker guard to prevent spawning multiple concurrent | ||
| ScanDagsTask processes from the same Python worker. Each process | ||
| imports the full Airflow scheduler stack, so spawning duplicates | ||
| increases memory usage and can create orphaned SchedulerJob entries | ||
| in the Airflow DB. This guard does not coordinate across multiple | ||
| Gunicorn workers or other processes. | ||
|
|
||
| If a scan is already running when a new deploy arrives, the call | ||
| is skipped. Newly deployed DAGs will be discovered by the next | ||
| deploy-triggered scan or by Airflow's periodic scheduler. | ||
| """ | ||
| with _scan_lock: | ||
| if _current_scan is not None and _current_scan.is_alive(): | ||
| logger.info("DAG scan already in progress, skipping") | ||
| return | ||
| _start_scan() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,126 @@ | ||
| # Copyright 2025 Collate | ||
| # Licensed under the Collate Community License, Version 1.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| """ | ||
| Test singleton guard for scan_dags_job_background. | ||
| Verifies fix for https://github.com/open-metadata/OpenMetadata/issues/23646 | ||
| """ | ||
|
|
||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| import openmetadata_managed_apis.api.utils as utils_module | ||
|
|
||
|
|
||
| def _reset_module_state(): | ||
| """Reset the module-level singleton state between tests.""" | ||
| utils_module._current_scan = None | ||
|
|
||
|
|
||
| def test_first_call_starts_process(): | ||
| """First call should spawn a new ScanDagsTask process.""" | ||
| _reset_module_state() | ||
|
|
||
| mock_process = MagicMock() | ||
| with patch.object(utils_module, "ScanDagsTask", return_value=mock_process): | ||
| with patch.object(utils_module.threading, "Thread"): | ||
| utils_module.scan_dags_job_background() | ||
|
|
||
| mock_process.start.assert_called_once() | ||
| assert utils_module._current_scan is mock_process | ||
|
|
||
|
|
||
| def test_skips_when_scan_alive(): | ||
| """While a scan is alive, new calls should skip (not spawn another).""" | ||
| _reset_module_state() | ||
|
|
||
| alive_process = MagicMock() | ||
| alive_process.is_alive.return_value = True | ||
| utils_module._current_scan = alive_process | ||
|
|
||
| with patch.object(utils_module, "ScanDagsTask") as mock_cls: | ||
| utils_module.scan_dags_job_background() | ||
|
|
||
| mock_cls.assert_not_called() | ||
| assert utils_module._current_scan is alive_process | ||
|
|
||
|
|
||
| def test_replaces_finished_scan_with_new_one(): | ||
| """When previous scan finished, a new call should start a fresh scan.""" | ||
| _reset_module_state() | ||
|
|
||
| finished_process = MagicMock() | ||
| finished_process.is_alive.return_value = False | ||
| utils_module._current_scan = finished_process | ||
|
|
||
| new_process = MagicMock() | ||
| with patch.object(utils_module, "ScanDagsTask", return_value=new_process): | ||
| with patch.object(utils_module.threading, "Thread"): | ||
| utils_module.scan_dags_job_background() | ||
|
|
||
| new_process.start.assert_called_once() | ||
| assert utils_module._current_scan is new_process | ||
|
|
||
|
|
||
| def test_reaper_clears_current_scan(): | ||
| """Reaper thread should join process and clear _current_scan.""" | ||
| _reset_module_state() | ||
|
|
||
| finished_process = MagicMock() | ||
| utils_module._current_scan = finished_process | ||
|
|
||
| with patch.object(utils_module, "ScanDagsTask") as mock_cls: | ||
| utils_module._reap_scan(finished_process) | ||
|
|
||
| finished_process.join.assert_called_once() | ||
| mock_cls.assert_not_called() | ||
| assert utils_module._current_scan is None | ||
|
|
||
|
|
||
| def test_reaper_never_forks(): | ||
| """Reaper thread must never start a new process (fork from non-main thread).""" | ||
| _reset_module_state() | ||
|
|
||
| finished_process = MagicMock() | ||
| utils_module._current_scan = finished_process | ||
|
|
||
| with patch.object(utils_module, "ScanDagsTask") as mock_cls: | ||
| utils_module._reap_scan(finished_process) | ||
|
|
||
| mock_cls.assert_not_called() | ||
|
|
||
|
|
||
|
Comment on lines
+86
to
+98
|
||
| def test_no_daemon_flag_on_process(): | ||
| """ScanDagsTask must NOT be created with daemon=True (spawns children).""" | ||
| _reset_module_state() | ||
|
|
||
| mock_process = MagicMock() | ||
| mock_process.daemon = False | ||
| with patch.object( | ||
| utils_module, "ScanDagsTask", return_value=mock_process | ||
| ) as mock_cls: | ||
| with patch.object(utils_module.threading, "Thread"): | ||
| utils_module.scan_dags_job_background() | ||
|
|
||
| mock_cls.assert_called_once_with() | ||
|
RajdeepKushwaha5 marked this conversation as resolved.
|
||
| assert mock_process.daemon is False | ||
|
|
||
|
|
||
| def test_stale_reaper_does_not_clear_replaced_scan(): | ||
| """Stale reaper must not clear _current_scan if another scan replaced it.""" | ||
| _reset_module_state() | ||
|
|
||
| old_process = MagicMock() | ||
| new_process = MagicMock() | ||
| utils_module._current_scan = new_process | ||
|
|
||
| utils_module._reap_scan(old_process) | ||
|
|
||
| old_process.join.assert_called_once() | ||
| assert utils_module._current_scan is new_process | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_start_scan()mutates_current_scan/_rescan_requestedbut relies on callers already holding_scan_lock. To avoid accidental future calls without the lock (which would introduce races), consider either acquiring_scan_lockinside_start_scan()or adding an explicit comment/assertion that it must only be called under the lock.