Fixes #23646: Fix memory leak in scan_dags_job_background by adding singleton guard#27057
Conversation
…d by adding singleton guard scan_dags_job_background() spawns a new multiprocessing.Process per deploy call. Each process imports the full Airflow scheduler stack (~120Mi) and is never join()ed, so zombie processes accumulate and memory is never released. Fix: track the running process with a threading.Lock, join() the previous process before starting a new one, skip if a scan is already in progress, and set daemon=True so zombies are cleaned up on parent exit.
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
There was a problem hiding this comment.
Pull request overview
Fixes unbounded process/memory growth in the Airflow managed APIs by preventing scan_dags_job_background() from spawning a new scheduler-scan multiprocessing.Process on every deploy call.
Changes:
- Adds a module-level lock and “current scan” process reference to guard concurrent invocations.
- Joins the previous scan process (when finished) before starting a new one, and skips starting a new scan if one is already running.
- Runs the scan process as a daemon and updates the function docstring to reflect the approach.
| """ | ||
| process = ScanDagsTask() | ||
| process.start() | ||
| global _current_scan # noqa: PLW0603 |
There was a problem hiding this comment.
# noqa: PLW0603 won’t silence pylint (this package uses # pylint: disable=... in multiple places, e.g. api/routes/health.py:35-37). If pylint is part of CI for this module, it may still flag global _current_scan; consider using the equivalent # pylint: disable=global-statement (or project-standard suppression) instead of noqa.
| global _current_scan # noqa: PLW0603 | |
| global _current_scan # pylint: disable=global-statement |
…dd tests - Remove daemon=True: ScanDagsTask spawns child processes (Airflow scheduler internals), which is forbidden for daemon processes - Add _rescan_requested flag: ensures deploys during an active scan queue a follow-up scan instead of silently dropping - Clarify docstring: guard is per-worker, not cross-Gunicorn - Replace noqa with pylint disable to match project conventions - Add unit tests covering singleton guard behavior
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
After joining a finished scan, check _rescan_requested before starting a new process. If no rescan was queued (flag is False), return early instead of unconditionally spawning a new scan. This ensures deploys that arrive during an active scan actually trigger a follow-up scan. Updated tests to cover both paths: rescan-requested starts new scan, no-rescan-requested returns without spawning.
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
…test - Extract _start_scan() and _reap_scan() helpers from main function - Reaper thread join()s the scan process and automatically starts a follow-up scan if _rescan_requested was set, ensuring deploys during an active scan are never lost — even without another deploy call - Simplify scan_dags_job_background() to just guard + delegate - Strengthen test_no_daemon_flag_on_process: assert process.daemon stays False after construction (catches post-init daemon=True) - Add test_reaper_starts_follow_up_when_rescan_requested - Add test_reaper_clears_current_scan_without_follow_up
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
A stale reaper thread (for process A) must not trigger a rescan if another scan (process B) has already replaced it. Move the _rescan_requested check inside the 'if _current_scan is process' block so only the reaper for the current scan can start a follow-up. Add test_stale_reaper_does_not_spawn_duplicate to cover the scenario.
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
| def _start_scan(): | ||
| """Start a new ScanDagsTask and spawn a reaper thread to join it.""" | ||
| global _current_scan, _rescan_requested # pylint: disable=global-statement | ||
| _rescan_requested = False | ||
| process = ScanDagsTask() | ||
| process.start() | ||
| _current_scan = process |
There was a problem hiding this comment.
_start_scan() mutates _current_scan/_rescan_requested but relies on callers already holding _scan_lock. To avoid accidental future calls without the lock (which would introduce races), consider either acquiring _scan_lock inside _start_scan() or adding an explicit comment/assertion that it must only be called under the lock.
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
🟡 Playwright Results — all passed (25 flaky)✅ 3599 passed · ❌ 0 failed · 🟡 25 flaky · ⏭️ 207 skipped
🟡 25 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |
| process.join() | ||
| with _scan_lock: | ||
| global _current_scan # pylint: disable=global-statement | ||
| if _current_scan is process: | ||
| _current_scan = None | ||
| if _rescan_requested: | ||
| logger.info("Running queued rescan after previous scan finished") | ||
| _start_scan() |
There was a problem hiding this comment.
_reap_scan() runs in the daemon reaper thread and, when _rescan_requested is set, it calls _start_scan() which creates/starts a new multiprocessing.Process. On Linux the default multiprocessing start method is fork, and forking from a non-main thread can deadlock or leave the child in an inconsistent state (because only the calling thread is replicated while locks from other threads remain held). This makes the queued-rescan path potentially unsafe in the Airflow webserver/Gunicorn environment.
Consider restructuring so process creation only happens from the main thread (e.g., have the reaper only join() + clear _current_scan, and trigger the follow-up scan via a main-thread code path), or switch ScanDagsTask creation to a thread-safe start method/context (e.g., spawn) specifically for the follow-up scan.
Copilot flagged that _reap_scan() calling _start_scan() forks a new multiprocessing.Process from a non-main thread. On Linux with the default 'fork' start-method, this can deadlock because only the calling thread is replicated while locks held by other threads remain permanently locked in the child. Fix: the reaper thread now only join()s the process and clears module state. The _rescan_requested machinery is removed — newly deployed DAGs are discovered by the next deploy-triggered scan or by Airflow's periodic scheduler.
Code Review ✅ Approved 6 resolved / 6 findingsFixes memory leak in scan_dags_job_background by adding singleton guard, addressing six concurrency and process management issues including daemon thread crashes, silent scan skips, and stale reaper threads. No remaining issues found. ✅ 6 resolved✅ Bug: daemon=True will crash ScanDagsTask when it spawns child processes
✅ Edge Case: Silently skipping scan may lose deploy-triggered DAG refreshes
✅ Bug: _rescan_requested flag is set but never read to trigger a rescan
✅ Bug: Deploy after finished scan silently skips DAG scanning
✅ Edge Case: Deferred rescan has no automatic trigger mechanism
...and 1 more resolved from earlier reviews OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
Describe your changes:
Fixes #23646
Each time an ingestion pipeline is deployed via the OpenMetadata UI,
scan_dags_job_background()spawns a newmultiprocessing.Process(ScanDagsTask). Each process:SchedulerJobwithheartrate=0, which the main scheduler marks as "failed"join()ed by the parent — so it becomes a zombie process whose memory is never releasedAfter N deploys, the webserver pod accumulates N × ~120Mi of leaked memory and N orphaned "failed" SchedulerJob entries in the Airflow database.
Fix: Add a per-worker singleton guard with a reaper thread to
scan_dags_job_background():threading.Lock+ module-level_current_scanreference prevents spawning multiple concurrent scan processes from the same Python worker_reap_scan) thatjoin()s the process when it finishes, releasing resources and preventing zombies_rescan_requestedflag is set; the reaper automatically starts one follow-up scan after the current one completes, ensuring newly deployed DAGs are always discovered_current_scan is processidentity guard, so a stale reaper (whose process was already replaced) cannot spawn duplicatesdaemon=Trueon ScanDagsTask — Airflow's scheduler internals fork child processes to parse DAGs, which Python forbids from daemon processes (AssertionError: daemonic processes are not allowed to have children). The reaper thread is daemonized instead.Before (broken):
After (fixed):
2 files changed:
utils.py(+45, -4),test_scan_dags_singleton.py(new, 7 test cases).Type of change:
Checklist:
Fixes #23646: Fix memory leak in scan_dags_job_background by adding singleton guard